autopilot: add OnChannelPendingOpen state update

This commit is contained in:
Wilmer Paulino 2018-08-06 18:18:05 -07:00
parent 956acdfa14
commit aa7c2e6d47
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 124 additions and 1 deletions

@ -186,13 +186,18 @@ type balanceUpdate struct {
balanceDelta btcutil.Amount
}
// chanOpenUpdate is a type of external state update the indicates a new
// chanOpenUpdate is a type of external state update that indicates a new
// channel has been opened, either by the Agent itself (within the main
// controller loop), or by an external user to the system.
type chanOpenUpdate struct {
newChan Channel
}
// chanPendingOpenUpdate is a type of external state update that indicates a new
// channel has been opened, either by the agent itself or an external subsystem,
// but is still pending.
type chanPendingOpenUpdate struct{}
// chanOpenFailureUpdate is a type of external state update that indicates
// a previous channel open failed, and that it might be possible to try again.
type chanOpenFailureUpdate struct{}
@ -231,6 +236,18 @@ func (a *Agent) OnChannelOpen(c Channel) {
}()
}
// OnChannelPendingOpen is a callback that should be executed each time a new
// channel is opened, either by the agent or an external subsystems, but is
// still pending.
func (a *Agent) OnChannelPendingOpen() {
go func() {
select {
case a.stateUpdates <- &chanPendingOpenUpdate{}:
case <-a.quit:
}
}()
}
// OnChannelOpenFailure is a callback that should be executed when the
// autopilot has attempted to open a channel, but failed. In this case we can
// retry channel creation with a different node.
@ -382,6 +399,12 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
updateBalance()
// A new channel has been opened by the agent or an
// external subsystem, but is still pending
// confirmation.
case *chanPendingOpenUpdate:
updateBalance()
// A channel has been closed, this may free up an
// available slot, triggering a new channel update.
case *chanCloseUpdate:
@ -598,6 +621,12 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
err)
}
}
// Since the channel open was successful
// and is currently pending, we'll
// trigger the autopilot agent to query
// for more peers.
a.OnChannelPendingOpen()
}(chanCandidate)
}
pendingMtx.Unlock()

@ -1089,3 +1089,97 @@ func TestAgentPendingChannelState(t *testing.T) {
t.Fatalf("select wasn't queried in time")
}
}
// TestAgentPendingOpenChannel ensures that the agent queries its heuristic once
// it detects a channel is pending open. This allows the agent to use its own
// change outputs that have yet to confirm for funding transactions.
func TestAgentPendingOpenChannel(t *testing.T) {
t.Parallel()
// First, we'll create all the dependencies that we'll need in order to
// create the autopilot agent.
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// The wallet will start with 6 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 6
// With the dependencies we created, we can now create the initial
// agent itself.
cfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil
},
Graph: memGraph,
MaxPendingOpens: 10,
}
agent, err := New(cfg, nil)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// To ensure the heuristic doesn't block on quitting the agent, we'll
// use the agent's quit chan to signal when it should also stop.
heuristic.quit = agent.quit
// With the autopilot agent and all its dependencies we'll start the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
// We'll send an initial "no" response to advance the agent past its
// initial check.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case heuristic.moreChansResps <- moreChansResp{false, 0, 0}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
// Next, we'll signal that a new channel has been opened, but it is
// still pending.
agent.OnChannelPendingOpen()
// The agent should now query the heuristic in order to determine its
// next action as its local state has now been modified.
wg.Add(1)
go func() {
defer wg.Done()
select {
case heuristic.moreChansResps <- moreChansResp{false, 0, 0}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
// We'll wait here for either the agent to query the heuristic to be
// queried, or for the timeout above to tick.
wg.Wait()
// There shouldn't be a call to the Select method as we've returned
// "false" for NeedMoreChans above.
select {
case heuristic.directiveResps <- []AttachmentDirective{}:
t.Fatalf("Select was called but shouldn't have been")
default:
}
}