Merge pull request #1776 from cfromknecht/autopilot-add-node-trigger

[autopilot]: add OnNodeUpdate trigger
This commit is contained in:
Olaoluwa Osuntokun 2018-08-23 20:35:34 -07:00 committed by GitHub
commit 625b210f44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 147 additions and 0 deletions

@ -186,6 +186,10 @@ type balanceUpdate struct {
balanceDelta btcutil.Amount
}
// nodeUpdates is a type of external state update that reflects an addition or
// modification in channel graph node membership.
type nodeUpdates struct{}
// chanOpenUpdate is a type of external state update that indicates a new
// channel has been opened, either by the Agent itself (within the main
// controller loop), or by an external user to the system.
@ -222,6 +226,20 @@ func (a *Agent) OnBalanceChange(delta btcutil.Amount) {
}()
}
// OnNodeUpdates is a callback that should be executed each time our channel
// graph has new nodes or their node announcements are updated.
func (a *Agent) OnNodeUpdates() {
a.wg.Add(1)
go func() {
defer a.wg.Done()
select {
case a.stateUpdates <- &nodeUpdates{}:
case <-a.quit:
}
}()
}
// OnChannelOpen is a callback that should be executed each time a new channel
// is manually opened by the user or any system outside the autopilot agent.
func (a *Agent) OnChannelOpen(c Channel) {
@ -417,6 +435,14 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
}
updateBalance()
// New nodes have been added to the graph or their node
// announcements have been updated. We will consider
// opening channels to these nodes if we haven't
// stabilized.
case *nodeUpdates:
log.Infof("Node updates received, assessing " +
"need for more channels")
}
pendingMtx.Lock()

@ -1183,3 +1183,117 @@ func TestAgentPendingOpenChannel(t *testing.T) {
default:
}
}
// TestAgentOnNodeUpdates tests that the agent will wake up in response to the
// OnNodeUpdates signal. This is useful in ensuring that autopilot is always
// pulling in the latest graph updates into its decision making. It also
// prevents the agent from stalling after an initial attempt that finds no nodes
// in the graph.
func TestAgentOnNodeUpdates(t *testing.T) {
t.Parallel()
// First, we'll create all the dependencies that we'll need in order to
// create the autopilot agent.
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// The wallet will start with 6 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 6
// With the dependencies we created, we can now create the initial
// agent itself.
cfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil
},
Graph: memGraph,
MaxPendingOpens: 10,
}
agent, err := New(cfg, nil)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// To ensure the heuristic doesn't block on quitting the agent, we'll
// use the agent's quit chan to signal when it should also stop.
heuristic.quit = agent.quit
// With the autopilot agent and all its dependencies we'll start the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
// We'll send an initial "yes" response to advance the agent past its
// initial check. This will cause it to try to get directives from an
// empty graph.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case heuristic.moreChansResps <- moreChansResp{
needMore: true,
numMore: 2,
amt: walletBalance,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
wg.Wait()
// Send over an empty list of attachment directives, which should cause
// the agent to return to waiting on a new signal.
select {
case heuristic.directiveResps <- []AttachmentDirective{}:
case <-time.After(time.Second * 10):
t.Fatalf("Select was not called but should have been")
}
// Simulate more nodes being added to the graph by informing the agent
// that we have node updates.
agent.OnNodeUpdates()
// In response, the agent should wake up and see if it needs more
// channels. Since we haven't done anything, we will send the same
// response as before since we are still trying to open channels.
var wg2 sync.WaitGroup
wg2.Add(1)
go func() {
defer wg2.Done()
select {
case heuristic.moreChansResps <- moreChansResp{
needMore: true,
numMore: 2,
amt: walletBalance,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
wg2.Wait()
// Again the agent should pull in the next set of attachment directives.
// It's not important that this list is also empty, so long as the node
// updates signal is causing the agent to make this attempt.
select {
case heuristic.directiveResps <- []AttachmentDirective{}:
case <-time.After(time.Second * 10):
t.Fatalf("Select was not called but should have been")
}
}

@ -306,6 +306,13 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error)
pilot.OnChannelClose(chanID)
}
// If new nodes were added to the graph, or nod
// information has changed, we'll poke autopilot
// to see if it can make use of them.
if len(topChange.NodeUpdates) > 0 {
pilot.OnNodeUpdates()
}
case <-svr.quit:
return
}