diff --git a/autopilot/agent.go b/autopilot/agent.go index b2450523..6e521ba6 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -186,13 +186,18 @@ type balanceUpdate struct { balanceDelta btcutil.Amount } -// chanOpenUpdate is a type of external state update the indicates a new +// chanOpenUpdate is a type of external state update that indicates a new // channel has been opened, either by the Agent itself (within the main // controller loop), or by an external user to the system. type chanOpenUpdate struct { newChan Channel } +// chanPendingOpenUpdate is a type of external state update that indicates a new +// channel has been opened, either by the agent itself or an external subsystem, +// but is still pending. +type chanPendingOpenUpdate struct{} + // chanOpenFailureUpdate is a type of external state update that indicates // a previous channel open failed, and that it might be possible to try again. type chanOpenFailureUpdate struct{} @@ -231,6 +236,18 @@ func (a *Agent) OnChannelOpen(c Channel) { }() } +// OnChannelPendingOpen is a callback that should be executed each time a new +// channel is opened, either by the agent or an external subsystems, but is +// still pending. +func (a *Agent) OnChannelPendingOpen() { + go func() { + select { + case a.stateUpdates <- &chanPendingOpenUpdate{}: + case <-a.quit: + } + }() +} + // OnChannelOpenFailure is a callback that should be executed when the // autopilot has attempted to open a channel, but failed. In this case we can // retry channel creation with a different node. @@ -382,6 +399,12 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { updateBalance() + // A new channel has been opened by the agent or an + // external subsystem, but is still pending + // confirmation. + case *chanPendingOpenUpdate: + updateBalance() + // A channel has been closed, this may free up an // available slot, triggering a new channel update. case *chanCloseUpdate: @@ -598,6 +621,12 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { err) } } + + // Since the channel open was successful + // and is currently pending, we'll + // trigger the autopilot agent to query + // for more peers. + a.OnChannelPendingOpen() }(chanCandidate) } pendingMtx.Unlock() diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index eb1009b3..d89913bf 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -1089,3 +1089,97 @@ func TestAgentPendingChannelState(t *testing.T) { t.Fatalf("select wasn't queried in time") } } + +// TestAgentPendingOpenChannel ensures that the agent queries its heuristic once +// it detects a channel is pending open. This allows the agent to use its own +// change outputs that have yet to confirm for funding transactions. +func TestAgentPendingOpenChannel(t *testing.T) { + t.Parallel() + + // First, we'll create all the dependencies that we'll need in order to + // create the autopilot agent. + self, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + heuristic := &mockHeuristic{ + moreChansResps: make(chan moreChansResp), + directiveResps: make(chan []AttachmentDirective), + } + chanController := &mockChanController{ + openChanSignals: make(chan openChanIntent), + } + memGraph, _, _ := newMemChanGraph() + + // The wallet will start with 6 BTC available. + const walletBalance = btcutil.SatoshiPerBitcoin * 6 + + // With the dependencies we created, we can now create the initial + // agent itself. + cfg := Config{ + Self: self, + Heuristic: heuristic, + ChanController: chanController, + WalletBalance: func() (btcutil.Amount, error) { + return walletBalance, nil + }, + Graph: memGraph, + MaxPendingOpens: 10, + } + agent, err := New(cfg, nil) + if err != nil { + t.Fatalf("unable to create agent: %v", err) + } + + // To ensure the heuristic doesn't block on quitting the agent, we'll + // use the agent's quit chan to signal when it should also stop. + heuristic.quit = agent.quit + + // With the autopilot agent and all its dependencies we'll start the + // primary controller goroutine. + if err := agent.Start(); err != nil { + t.Fatalf("unable to start agent: %v", err) + } + defer agent.Stop() + + // We'll send an initial "no" response to advance the agent past its + // initial check. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + + // Next, we'll signal that a new channel has been opened, but it is + // still pending. + agent.OnChannelPendingOpen() + + // The agent should now query the heuristic in order to determine its + // next action as its local state has now been modified. + wg.Add(1) + go func() { + defer wg.Done() + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + }() + + // We'll wait here for either the agent to query the heuristic to be + // queried, or for the timeout above to tick. + wg.Wait() + + // There shouldn't be a call to the Select method as we've returned + // "false" for NeedMoreChans above. + select { + case heuristic.directiveResps <- []AttachmentDirective{}: + t.Fatalf("Select was called but shouldn't have been") + default: + } +}