From 4a88c61a90135bb8eb685a4920fd377973590044 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 31 Aug 2018 08:55:41 +0200 Subject: [PATCH] autopilot/agent: signal nodeUpdates on own channel We do this to avoid a huge amount of goroutines piling up on initial graph sync, as they will all block trying to send the node update on the stateUpdates channel. Now we instead make a new buffered channel nodeUpdates, and just return immediately if there is already a signal in the channel waiting to be processed. --- autopilot/agent.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index a401d247..1f04c9d6 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -117,6 +117,11 @@ type Agent struct { // affect the heuristics of the agent will be sent over. stateUpdates chan interface{} + // nodeUpdates is a channel that changes to the graph node landscape + // will be sent over. This channel will be buffered to ensure we have + // at most one pending update of this type to handle at a given time. + nodeUpdates chan *nodeUpdates + // totalBalance is the total number of satoshis the backing wallet is // known to control at any given instance. This value will be updated // when the agent receives external balance update signals. @@ -136,6 +141,7 @@ func New(cfg Config, initialState []Channel) (*Agent, error) { chanState: make(map[lnwire.ShortChannelID]Channel), quit: make(chan struct{}), stateUpdates: make(chan interface{}), + nodeUpdates: make(chan *nodeUpdates, 1), } for _, c := range initialState { @@ -223,15 +229,10 @@ func (a *Agent) OnBalanceChange() { // OnNodeUpdates is a callback that should be executed each time our channel // graph has new nodes or their node announcements are updated. func (a *Agent) OnNodeUpdates() { - a.wg.Add(1) - go func() { - defer a.wg.Done() - - select { - case a.stateUpdates <- &nodeUpdates{}: - case <-a.quit: - } - }() + select { + case a.nodeUpdates <- &nodeUpdates{}: + default: + } } // OnChannelOpen is a callback that should be executed each time a new channel @@ -430,16 +431,15 @@ func (a *Agent) controller() { } updateBalance() - - // New nodes have been added to the graph or their node - // announcements have been updated. We will consider - // opening channels to these nodes if we haven't - // stabilized. - case *nodeUpdates: - log.Infof("Node updates received, assessing " + - "need for more channels") } + // New nodes have been added to the graph or their node + // announcements have been updated. We will consider opening + // channels to these nodes if we haven't stabilized. + case <-a.nodeUpdates: + log.Infof("Node updates received, assessing " + + "need for more channels") + // The agent has been signalled to exit, so we'll bail out // immediately. case <-a.quit: