From e9cc7492a9afa1441eaf71cef97ef0fcca8286dd Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 9 Aug 2018 10:03:14 +0200 Subject: [PATCH] server: add goroutine watchChannelStatus This commit adds a goroutine watchChannelStatus to the server, which will query the switch for the status of all open channels every InactiveChanTimeout / 4. If a channel's status has remained unchanged during the last InactiveChanTimeout it'll send out a ChannelUpdate setting the disabled bit accordingly. --- server.go | 114 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/server.go b/server.go index 2a472b5d..125e0425 100644 --- a/server.go +++ b/server.go @@ -993,6 +993,11 @@ func (s *server) Start() error { srvrLog.Infof("Auto peer bootstrapping is disabled") } + // Start a goroutine that will periodically send out ChannelUpdates + // based on a channel's status. + s.wg.Add(1) + go s.watchChannelStatus() + return nil } @@ -3024,3 +3029,112 @@ func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate) error { return ErrServerShuttingDown } } + +// watchChannelStatus periodically queries the Switch for the status of the +// open channels, and sends out ChannelUpdates to the network indicating their +// active status. Currently we'll send out either a Disabled or Active update +// if the channel has been in the same status over a given amount of time. +// +// NOTE: This MUST be run as a goroutine. +func (s *server) watchChannelStatus() { + defer s.wg.Done() + + // A map with values activeStatus is used to keep track of the first + // time we saw a link changing to the current active status. + type activeStatus struct { + active bool + time time.Time + } + status := make(map[wire.OutPoint]activeStatus) + + // We'll check in on the channel statuses every 1/4 of the timeout. + unchangedTimeout := cfg.InactiveChanTimeout + tickerTimeout := unchangedTimeout / 4 + + if unchangedTimeout == 0 || tickerTimeout == 0 { + srvrLog.Debugf("Won't watch channel statuses") + return + } + + ticker := time.NewTicker(tickerTimeout) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + channels, err := s.chanDB.FetchAllOpenChannels() + if err != nil { + srvrLog.Errorf("Unable to fetch open "+ + "channels: %v", err) + continue + } + + // For each open channel, update the status. We'll copy + // the updated statuses to a new map, to avoid keeping + // the status of closed channels around. + newStatus := make(map[wire.OutPoint]activeStatus) + for _, c := range channels { + chanID := lnwire.NewChanIDFromOutPoint( + &c.FundingOutpoint) + + // Get the current active stauts from the + // Switch. + active := s.htlcSwitch.HasActiveLink(chanID) + + var currentStatus activeStatus + + // If this link is not in the map, or the + // status has changed, set an updated active + // status. + st, ok := status[c.FundingOutpoint] + if !ok || st.active != active { + currentStatus = activeStatus{ + active: active, + time: time.Now(), + } + } else { + // The status is unchanged, we'll keep + // it as is. + currentStatus = st + } + + newStatus[c.FundingOutpoint] = currentStatus + } + + // Set the status map to the map of new statuses. + status = newStatus + + // If no change in status has happened during the last + // interval, we'll send out an update. Note that we add + // the negative of the timeout to set our limit in the + // past. + limit := time.Now().Add(-unchangedTimeout) + + // We'll send out an update for all channels that have + // had their status unchanged for longer than the limit. + for op, st := range status { + disable := !st.active + + if st.time.Before(limit) { + // Before we attempt to announce the + // status of the channel, we remove it + // from the status map such that it + // will need a full unchaged interval + // before we attempt to announce its + // status again. + delete(status, op) + + err = s.announceChanStatus(op, disable) + if err != nil { + srvrLog.Errorf("Unable to "+ + "disable channel: %v", + err) + } + } + } + + case <-s.quit: + return + } + } +}