autopilot/agent: signal nodeUpdates on own channel

We do this to avoid a huge amount of goroutines piling up on initial
graph sync, as they will all block trying to send the node update on the
stateUpdates channel. Now we instead make a new buffered channel
nodeUpdates, and just return immediately if there is already a signal in
the channel waiting to be processed.
This commit is contained in:
Johan T. Halseth 2018-08-31 08:55:41 +02:00
parent 3e992f094d
commit 4a88c61a90
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -117,6 +117,11 @@ type Agent struct {
// affect the heuristics of the agent will be sent over. // affect the heuristics of the agent will be sent over.
stateUpdates chan interface{} stateUpdates chan interface{}
// nodeUpdates is a channel that changes to the graph node landscape
// will be sent over. This channel will be buffered to ensure we have
// at most one pending update of this type to handle at a given time.
nodeUpdates chan *nodeUpdates
// totalBalance is the total number of satoshis the backing wallet is // totalBalance is the total number of satoshis the backing wallet is
// known to control at any given instance. This value will be updated // known to control at any given instance. This value will be updated
// when the agent receives external balance update signals. // when the agent receives external balance update signals.
@ -136,6 +141,7 @@ func New(cfg Config, initialState []Channel) (*Agent, error) {
chanState: make(map[lnwire.ShortChannelID]Channel), chanState: make(map[lnwire.ShortChannelID]Channel),
quit: make(chan struct{}), quit: make(chan struct{}),
stateUpdates: make(chan interface{}), stateUpdates: make(chan interface{}),
nodeUpdates: make(chan *nodeUpdates, 1),
} }
for _, c := range initialState { for _, c := range initialState {
@ -223,15 +229,10 @@ func (a *Agent) OnBalanceChange() {
// OnNodeUpdates is a callback that should be executed each time our channel // OnNodeUpdates is a callback that should be executed each time our channel
// graph has new nodes or their node announcements are updated. // graph has new nodes or their node announcements are updated.
func (a *Agent) OnNodeUpdates() { func (a *Agent) OnNodeUpdates() {
a.wg.Add(1)
go func() {
defer a.wg.Done()
select { select {
case a.stateUpdates <- &nodeUpdates{}: case a.nodeUpdates <- &nodeUpdates{}:
case <-a.quit: default:
} }
}()
} }
// OnChannelOpen is a callback that should be executed each time a new channel // OnChannelOpen is a callback that should be executed each time a new channel
@ -430,15 +431,14 @@ func (a *Agent) controller() {
} }
updateBalance() updateBalance()
}
// New nodes have been added to the graph or their node // New nodes have been added to the graph or their node
// announcements have been updated. We will consider // announcements have been updated. We will consider opening
// opening channels to these nodes if we haven't // channels to these nodes if we haven't stabilized.
// stabilized. case <-a.nodeUpdates:
case *nodeUpdates:
log.Infof("Node updates received, assessing " + log.Infof("Node updates received, assessing " +
"need for more channels") "need for more channels")
}
// The agent has been signalled to exit, so we'll bail out // The agent has been signalled to exit, so we'll bail out
// immediately. // immediately.