server: add goroutine watchChannelStatus
This commit adds a goroutine watchChannelStatus to the server, which will query the switch for the status of all open channels every InactiveChanTimeout / 4. If a channel's status has remained unchanged during the last InactiveChanTimeout it'll send out a ChannelUpdate setting the disabled bit accordingly.
This commit is contained in:
parent
9e44b38eee
commit
e9cc7492a9
114
server.go
114
server.go
@ -993,6 +993,11 @@ func (s *server) Start() error {
|
||||
srvrLog.Infof("Auto peer bootstrapping is disabled")
|
||||
}
|
||||
|
||||
// Start a goroutine that will periodically send out ChannelUpdates
|
||||
// based on a channel's status.
|
||||
s.wg.Add(1)
|
||||
go s.watchChannelStatus()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -3024,3 +3029,112 @@ func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate) error {
|
||||
return ErrServerShuttingDown
|
||||
}
|
||||
}
|
||||
|
||||
// watchChannelStatus periodically queries the Switch for the status of the
|
||||
// open channels, and sends out ChannelUpdates to the network indicating their
|
||||
// active status. Currently we'll send out either a Disabled or Active update
|
||||
// if the channel has been in the same status over a given amount of time.
|
||||
//
|
||||
// NOTE: This MUST be run as a goroutine.
|
||||
func (s *server) watchChannelStatus() {
|
||||
defer s.wg.Done()
|
||||
|
||||
// A map with values activeStatus is used to keep track of the first
|
||||
// time we saw a link changing to the current active status.
|
||||
type activeStatus struct {
|
||||
active bool
|
||||
time time.Time
|
||||
}
|
||||
status := make(map[wire.OutPoint]activeStatus)
|
||||
|
||||
// We'll check in on the channel statuses every 1/4 of the timeout.
|
||||
unchangedTimeout := cfg.InactiveChanTimeout
|
||||
tickerTimeout := unchangedTimeout / 4
|
||||
|
||||
if unchangedTimeout == 0 || tickerTimeout == 0 {
|
||||
srvrLog.Debugf("Won't watch channel statuses")
|
||||
return
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(tickerTimeout)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
channels, err := s.chanDB.FetchAllOpenChannels()
|
||||
if err != nil {
|
||||
srvrLog.Errorf("Unable to fetch open "+
|
||||
"channels: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// For each open channel, update the status. We'll copy
|
||||
// the updated statuses to a new map, to avoid keeping
|
||||
// the status of closed channels around.
|
||||
newStatus := make(map[wire.OutPoint]activeStatus)
|
||||
for _, c := range channels {
|
||||
chanID := lnwire.NewChanIDFromOutPoint(
|
||||
&c.FundingOutpoint)
|
||||
|
||||
// Get the current active stauts from the
|
||||
// Switch.
|
||||
active := s.htlcSwitch.HasActiveLink(chanID)
|
||||
|
||||
var currentStatus activeStatus
|
||||
|
||||
// If this link is not in the map, or the
|
||||
// status has changed, set an updated active
|
||||
// status.
|
||||
st, ok := status[c.FundingOutpoint]
|
||||
if !ok || st.active != active {
|
||||
currentStatus = activeStatus{
|
||||
active: active,
|
||||
time: time.Now(),
|
||||
}
|
||||
} else {
|
||||
// The status is unchanged, we'll keep
|
||||
// it as is.
|
||||
currentStatus = st
|
||||
}
|
||||
|
||||
newStatus[c.FundingOutpoint] = currentStatus
|
||||
}
|
||||
|
||||
// Set the status map to the map of new statuses.
|
||||
status = newStatus
|
||||
|
||||
// If no change in status has happened during the last
|
||||
// interval, we'll send out an update. Note that we add
|
||||
// the negative of the timeout to set our limit in the
|
||||
// past.
|
||||
limit := time.Now().Add(-unchangedTimeout)
|
||||
|
||||
// We'll send out an update for all channels that have
|
||||
// had their status unchanged for longer than the limit.
|
||||
for op, st := range status {
|
||||
disable := !st.active
|
||||
|
||||
if st.time.Before(limit) {
|
||||
// Before we attempt to announce the
|
||||
// status of the channel, we remove it
|
||||
// from the status map such that it
|
||||
// will need a full unchaged interval
|
||||
// before we attempt to announce its
|
||||
// status again.
|
||||
delete(status, op)
|
||||
|
||||
err = s.announceChanStatus(op, disable)
|
||||
if err != nil {
|
||||
srvrLog.Errorf("Unable to "+
|
||||
"disable channel: %v",
|
||||
err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case <-s.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user