From de282172a19eac72b46fdf8f7db473cf8a036ac3 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 14 Feb 2019 17:13:44 -0800 Subject: [PATCH] peer+server+test_utlils: use new ChanStatusManager This commit hooks up the new netann.ChanStatusManager, replacing the prior method which used the watchChannelStatus goroutine. --- peer.go | 130 +++++++++++++++++++--------- server.go | 228 +++++++------------------------------------------- test_utils.go | 28 ++++++- 3 files changed, 149 insertions(+), 237 deletions(-) diff --git a/peer.go b/peer.go index ee44655e..976804e9 100644 --- a/peer.go +++ b/peer.go @@ -142,12 +142,21 @@ type peer struct { // objects to queue messages to be sent out on the wire. outgoingQueue chan outgoingMsg + // activeChanMtx protects access to the activeChannels and + // addeddChannels maps. + activeChanMtx sync.RWMutex + // activeChannels is a map which stores the state machines of all // active channels. Channels are indexed into the map by the txid of // the funding transaction which opened the channel. - activeChanMtx sync.RWMutex activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel + // addedChannels tracks any new channels opened during this peer's + // lifecycle. We use this to filter out these new channels when the time + // comes to request a reenable for active channels, since they will have + // waited a shorter duration. + addedChannels map[lnwire.ChannelID]struct{} + // newChannels is used by the fundingManager to send fully opened // channels to the source peer which handled the funding workflow. newChannels chan *newChannelMsg @@ -172,6 +181,11 @@ type peer struct { // well as lnwire.ClosingSigned messages. chanCloseMsgs chan *closeMsg + // chanActiveTimeout specifies the duration the peer will wait to + // request a channel reenable, beginning from the time the peer was + // started. + chanActiveTimeout time.Duration + server *server // localFeatures is the set of local features that we advertised to the @@ -212,7 +226,8 @@ var _ lnpeer.Peer = (*peer)(nil) // pointer to the main server. func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, addr *lnwire.NetAddress, inbound bool, - localFeatures *lnwire.RawFeatureVector) (*peer, error) { + localFeatures *lnwire.RawFeatureVector, + chanActiveTimeout time.Duration) (*peer, error) { nodePub := addr.IdentityKey @@ -230,6 +245,7 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, sendQueue: make(chan outgoingMsg), outgoingQueue: make(chan outgoingMsg), + addedChannels: make(map[lnwire.ChannelID]struct{}), activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), newChannels: make(chan *newChannelMsg, 1), @@ -239,6 +255,8 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, chanCloseMsgs: make(chan *closeMsg), failedChannels: make(map[lnwire.ChannelID]struct{}), + chanActiveTimeout: chanActiveTimeout, + writeBuf: server.writeBufferPool.Take(), queueQuit: make(chan struct{}), @@ -392,7 +410,6 @@ func (p *peer) QuitSignal() <-chan struct{} { // loadActiveChannels creates indexes within the peer for tracking all active // channels returned by the database. func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { - var activePublicChans []wire.OutPoint for _, dbChan := range chans { lnChan, err := lnwallet.NewLightningChannel( p.server.cc.signer, p.server.witnessBeacon, dbChan, @@ -499,33 +516,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { p.activeChanMtx.Lock() p.activeChannels[chanID] = lnChan p.activeChanMtx.Unlock() - - // To ensure we can route through this channel now that the peer - // is back online, we'll attempt to send an update to enable it. - // This will only be used for non-pending public channels, as - // they are the only ones capable of routing. - chanIsPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0 - if chanIsPublic && !dbChan.IsPending { - activePublicChans = append(activePublicChans, *chanPoint) - } } - // As a final measure we launch a goroutine that will ensure the newly - // loaded public channels are not currently disabled, as that will make - // us skip it during path finding. - go func() { - for _, chanPoint := range activePublicChans { - // Set the channel disabled=false by sending out a new - // ChannelUpdate. If this channel is already active, - // the update won't be sent. - err := p.server.announceChanStatus(chanPoint, false) - if err != nil && err != channeldb.ErrEdgeNotFound { - srvrLog.Errorf("Unable to enable channel %v: %v", - chanPoint, err) - } - } - }() - return nil } @@ -1580,6 +1572,11 @@ func (p *peer) genDeliveryScript() ([]byte, error) { func (p *peer) channelManager() { defer p.wg.Done() + // reenableTimeout will fire once after the configured channel status + // interval has elapsed. This will trigger us to sign new channel + // updates and broadcast them with the "disabled" flag unset. + reenableTimeout := time.After(p.chanActiveTimeout) + out: for { select { @@ -1641,6 +1638,7 @@ out: } p.activeChannels[chanID] = lnChan + p.addedChannels[chanID] = struct{}{} p.activeChanMtx.Unlock() peerLog.Infof("New channel active ChannelPoint(%v) "+ @@ -1782,6 +1780,25 @@ out: // relevant sub-systems and launching a goroutine to // wait for close tx conf. p.finalizeChanClosure(chanCloser) + + // The channel reannounce delay has elapsed, broadcast the + // reenabled channel updates to the network. This should only + // fire once, so we set the reenableTimeout channel to nil to + // mark it for garbage collection. If the peer is torn down + // before firing, reenabling will not be attempted. + // TODO(conner): consolidate reenables timers inside chan status + // manager + case <-reenableTimeout: + p.reenableActiveChannels() + + // Since this channel will never fire again during the + // lifecycle of the peer, we nil the channel to mark it + // eligible for garbage collection, and make this + // explicity ineligible to receive in future calls to + // select. This also shaves a few CPU cycles since the + // select will ignore this case entirely. + reenableTimeout = nil + case <-p.quit: // As, we've been signalled to exit, we'll reset all @@ -1797,6 +1814,49 @@ out: } } +// reenableActiveChannels searches the index of channels maintained with this +// peer, and reenables each public, non-pending channel. This is done at the +// gossip level by broadcasting a new ChannelUpdate with the disabled bit unset. +// No message will be sent if the channel is already enabled. +func (p *peer) reenableActiveChannels() { + // First, filter all known channels with this peer for ones that are + // both public and not pending. + var activePublicChans []wire.OutPoint + p.activeChanMtx.RLock() + for chanID, lnChan := range p.activeChannels { + dbChan := lnChan.State() + isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0 + if !isPublic || dbChan.IsPending { + continue + } + + // We'll also skip any channels added during this peer's + // lifecycle since they haven't waited out the timeout. Their + // first announcement will be enabled, and the chan status + // manager will begin monitoring them passively since they exist + // in the database. + if _, ok := p.addedChannels[chanID]; ok { + continue + } + + activePublicChans = append( + activePublicChans, dbChan.FundingOutpoint, + ) + } + p.activeChanMtx.RUnlock() + + // For each of the public, non-pending channels, set the channel + // disabled bit to false and send out a new ChannelUpdate. If this + // channel is already active, the update won't be sent. + for _, chanPoint := range activePublicChans { + err := p.server.chanStatusMgr.RequestEnable(chanPoint) + if err != nil { + srvrLog.Errorf("Unable to enable channel %v: %v", + chanPoint, err) + } + } +} + // fetchActiveChanCloser attempts to fetch the active chan closer state machine // for the target channel ID. If the channel isn't active an error is returned. // Otherwise, either an existing state machine will be returned, or a new one @@ -1854,11 +1914,8 @@ func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) (*channelCloser, e channel: channel, unregisterChannel: p.server.htlcSwitch.RemoveLink, broadcastTx: p.server.cc.wallet.PublishTransaction, - disableChannel: func(op wire.OutPoint) error { - return p.server.announceChanStatus(op, - true) - }, - quit: p.quit, + disableChannel: p.server.chanStatusMgr.RequestDisable, + quit: p.quit, }, deliveryAddr, feePerKw, @@ -1917,11 +1974,8 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) { channel: channel, unregisterChannel: p.server.htlcSwitch.RemoveLink, broadcastTx: p.server.cc.wallet.PublishTransaction, - disableChannel: func(op wire.OutPoint) error { - return p.server.announceChanStatus(op, - true) - }, - quit: p.quit, + disableChannel: p.server.chanStatusMgr.RequestDisable, + quit: p.quit, }, deliveryAddr, req.TargetFeePerKw, diff --git a/server.go b/server.go index 078bd9cc..4d2b526d 100644 --- a/server.go +++ b/server.go @@ -89,6 +89,8 @@ type server struct { // that's backed by the identity private key of the running lnd node. nodeSigner *netann.NodeSigner + chanStatusMgr *netann.ChanStatusManager + // listenAddrs is the list of addresses the server is currently // listening on. listenAddrs []net.Addr @@ -179,11 +181,6 @@ type server struct { // changed since last start. currentNodeAnn *lnwire.NodeAnnouncement - // sendDisabled is used to keep track of the disabled flag of the last - // sent ChannelUpdate from announceChanStatus. - sentDisabled map[wire.OutPoint]bool - sentDisabledMtx sync.Mutex - quit chan struct{} wg sync.WaitGroup @@ -300,7 +297,6 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, outboundPeers: make(map[string]*peer), peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), peerDisconnectedListeners: make(map[string][]chan<- struct{}), - sentDisabled: make(map[wire.OutPoint]bool), globalFeatures: lnwire.NewFeatureVector(globalFeatures, lnwire.GlobalFeatures), @@ -369,6 +365,24 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, return nil, err } + chanStatusMgrCfg := &netann.ChanStatusConfig{ + ChanStatusSampleInterval: cfg.ChanStatusSampleInterval, + ChanEnableTimeout: cfg.ChanEnableTimeout, + ChanDisableTimeout: cfg.ChanDisableTimeout, + OurPubKey: privKey.PubKey(), + MessageSigner: s.nodeSigner, + IsChannelActive: s.htlcSwitch.HasActiveLink, + ApplyChannelUpdate: s.applyChannelUpdate, + DB: chanDB, + Graph: chanDB.ChannelGraph(), + } + + chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg) + if err != nil { + return nil, err + } + s.chanStatusMgr = chanStatusMgr + // If enabled, use either UPnP or NAT-PMP to automatically configure // port forwarding for users behind a NAT. if cfg.NAT { @@ -739,9 +753,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, return ErrServerShuttingDown } }, - DisableChannel: func(op wire.OutPoint) error { - return s.announceChanStatus(op, true) - }, + DisableChannel: s.chanStatusMgr.RequestDisable, Sweeper: s.sweeper, SettleInvoice: s.invoices.SettleInvoice, NotifyClosedChannel: s.channelNotifier.NotifyClosedChannelEvent, @@ -1030,6 +1042,9 @@ func (s *server) Start() error { if err := s.invoices.Start(); err != nil { return err } + if err := s.chanStatusMgr.Start(); err != nil { + return err + } // With all the relevant sub-systems started, we'll now attempt to // establish persistent connections to our direct channel collaborators @@ -1060,11 +1075,6 @@ func (s *server) Start() error { srvrLog.Infof("Auto peer bootstrapping is disabled") } - // Start a goroutine that will periodically send out ChannelUpdates - // based on a channel's status. - s.wg.Add(1) - go s.watchChannelStatus() - return nil } @@ -1085,6 +1095,7 @@ func (s *server) Stop() error { } // Shutdown the wallet, funding manager, and the rpc server. + s.chanStatusMgr.Stop() s.sigPool.Stop() s.cc.chainNotifier.Stop() s.chanRouter.Stop() @@ -2414,7 +2425,10 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, // Now that we've established a connection, create a peer, and it to // the set of currently active peers. - p, err := newPeer(conn, connReq, s, peerAddr, inbound, localFeatures) + p, err := newPeer( + conn, connReq, s, peerAddr, inbound, localFeatures, + cfg.ChanEnableTimeout, + ) if err != nil { srvrLog.Errorf("unable to create peer %v", err) return @@ -2995,68 +3009,6 @@ func (s *server) fetchNodeAdvertisedAddr(pub *btcec.PublicKey) (net.Addr, error) return node.Addresses[0], nil } -// announceChanStatus disables a channel if disabled=true, otherwise activates -// it. This is done by sending a new channel update across the network with the -// disabled flag set accordingly. The result of disabling the channel is it not -// being able to forward payments. -func (s *server) announceChanStatus(op wire.OutPoint, disabled bool) error { - s.sentDisabledMtx.Lock() - defer s.sentDisabledMtx.Unlock() - - // If we have already sent out an update reflecting the current status, - // skip this channel. - alreadyDisabled, ok := s.sentDisabled[op] - if ok && alreadyDisabled == disabled { - return nil - } - - // Retrieve the latest update for this channel. We'll use this - // as our starting point to send the new update. - chanUpdate, err := s.fetchLastChanUpdateByOutPoint(op) - if err != nil { - return err - } - - // Now, sign a new update toggling the disable bit. - err = netann.SignChannelUpdate( - s.nodeSigner, s.identityPriv.PubKey(), chanUpdate, - netann.ChannelUpdateSetDisable(disabled), - ) - if err != nil { - return err - } - - srvrLog.Debugf("Announcing channel(%v) disabled=%v", op, disabled) - - // Once signed, we'll send the new update to all of our peers. - if err := s.applyChannelUpdate(chanUpdate); err != nil { - return err - } - - // We'll keep track of the status set in the last update we sent, to - // avoid sending updates if nothing has changed. - s.sentDisabled[op] = disabled - - return nil -} - -// fetchLastChanUpdateByOutPoint fetches the latest policy for our direction of -// a channel, and crafts a new ChannelUpdate with this policy. Returns an error -// in case our ChannelEdgePolicy is not found in the database. -func (s *server) fetchLastChanUpdateByOutPoint(op wire.OutPoint) ( - *lnwire.ChannelUpdate, error) { - - // Get the edge info and policies for this channel from the graph. - graph := s.chanDB.ChannelGraph() - info, edge1, edge2, err := graph.FetchChannelEdgesByOutpoint(&op) - if err != nil { - return nil, err - } - - pubKey := s.identityPriv.PubKey().SerializeCompressed() - return netann.ExtractChannelUpdate(pubKey, info, edge1, edge2) -} - // fetchLastChanUpdate returns a function which is able to retrieve our latest // channel update for a target channel. func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) ( @@ -3068,6 +3020,7 @@ func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) ( if err != nil { return nil, err } + return netann.ExtractChannelUpdate( ourPubKey[:], info, edge1, edge2, ) @@ -3086,124 +3039,3 @@ func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate) error { return ErrServerShuttingDown } } - -// watchChannelStatus periodically queries the Switch for the status of the -// open channels, and sends out ChannelUpdates to the network indicating their -// active status. Currently we'll send out either a Disabled or Active update -// if the channel has been in the same status over a given amount of time. -// -// NOTE: This MUST be run as a goroutine. -func (s *server) watchChannelStatus() { - defer s.wg.Done() - - // A map with values activeStatus is used to keep track of the first - // time we saw a link changing to the current active status. - type activeStatus struct { - active bool - time time.Time - } - status := make(map[wire.OutPoint]activeStatus) - - // We'll check in on the channel statuses every 1/4 of the timeout. - unchangedTimeout := cfg.ChanDisableTimeout - tickerTimeout := unchangedTimeout / 4 - - if unchangedTimeout == 0 || tickerTimeout == 0 { - srvrLog.Debugf("Won't watch channel statuses") - return - } - - ticker := time.NewTicker(tickerTimeout) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - channels, err := s.chanDB.FetchAllOpenChannels() - if err != nil { - srvrLog.Errorf("Unable to fetch open "+ - "channels: %v", err) - continue - } - - // For each open channel, update the status. We'll copy - // the updated statuses to a new map, to avoid keeping - // the status of closed channels around. - newStatus := make(map[wire.OutPoint]activeStatus) - for _, c := range channels { - // We'll skip any private channels, as they - // aren't used for routing within the network by - // other nodes. - if c.ChannelFlags&lnwire.FFAnnounceChannel == 0 { - continue - } - - chanID := lnwire.NewChanIDFromOutPoint( - &c.FundingOutpoint) - - // Get the current active stauts from the - // Switch. - active := s.htlcSwitch.HasActiveLink(chanID) - - var currentStatus activeStatus - - // If this link is not in the map, or the - // status has changed, set an updated active - // status. - st, ok := status[c.FundingOutpoint] - if !ok || st.active != active { - currentStatus = activeStatus{ - active: active, - time: time.Now(), - } - } else { - // The status is unchanged, we'll keep - // it as is. - currentStatus = st - } - - newStatus[c.FundingOutpoint] = currentStatus - } - - // Set the status map to the map of new statuses. - status = newStatus - - // If no change in status has happened during the last - // interval, we'll send out an update. Note that we add - // the negative of the timeout to set our limit in the - // past. - limit := time.Now().Add(-unchangedTimeout) - - // We'll send out an update for all channels that have - // had their status unchanged for longer than the limit. - // NOTE: We also make sure to activate any channel when - // we connect to a peer, to make them available for - // path finding immediately. - for op, st := range status { - disable := !st.active - - if st.time.Before(limit) { - // Before we attempt to announce the - // status of the channel, we remove it - // from the status map such that it - // will need a full unchaged interval - // before we attempt to announce its - // status again. - delete(status, op) - - err = s.announceChanStatus(op, disable) - if err != nil && - err != channeldb.ErrEdgeNotFound { - - srvrLog.Errorf("Unable to "+ - "disable channel %v: %v", - op, err) - } - } - } - - case <-s.quit: - return - } - } -} diff --git a/test_utils.go b/test_utils.go index 32f4f850..34db7fcd 100644 --- a/test_utils.go +++ b/test_utils.go @@ -9,6 +9,7 @@ import ( "math/rand" "net" "os" + "time" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -22,6 +23,7 @@ import ( "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/netann" "github.com/lightningnetwork/lnd/shachain" "github.com/lightningnetwork/lnd/ticker" ) @@ -367,8 +369,30 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, if err != nil { return nil, nil, nil, nil, err } + if err = htlcSwitch.Start(); err != nil { + return nil, nil, nil, nil, err + } s.htlcSwitch = htlcSwitch - s.htlcSwitch.Start() + + nodeSignerAlice := netann.NewNodeSigner(aliceKeyPriv) + + const chanActiveTimeout = time.Minute + + chanStatusMgr, err := netann.NewChanStatusManager(&netann.ChanStatusConfig{ + ChanStatusSampleInterval: 30 * time.Second, + ChanEnableTimeout: chanActiveTimeout, + ChanDisableTimeout: 2 * time.Minute, + DB: dbAlice, + Graph: dbAlice.ChannelGraph(), + MessageSigner: nodeSignerAlice, + OurPubKey: aliceKeyPub, + IsChannelActive: s.htlcSwitch.HasActiveLink, + ApplyChannelUpdate: func(*lnwire.ChannelUpdate) error { return nil }, + }) + if err != nil { + return nil, nil, nil, nil, err + } + s.chanStatusMgr = chanStatusMgr alicePeer := &peer{ addr: &lnwire.NetAddress{ @@ -387,6 +411,8 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, localCloseChanReqs: make(chan *htlcswitch.ChanClose), chanCloseMsgs: make(chan *closeMsg), + chanActiveTimeout: chanActiveTimeout, + queueQuit: make(chan struct{}), quit: make(chan struct{}), }