diff --git a/autopilot/agent.go b/autopilot/agent.go index d7c14e23..6f137583 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -154,13 +154,8 @@ func (a *Agent) Start() error { log.Infof("Autopilot Agent starting") - startingBalance, err := a.cfg.WalletBalance() - if err != nil { - return err - } - a.wg.Add(1) - go a.controller(startingBalance) + go a.controller() return nil } @@ -183,7 +178,6 @@ func (a *Agent) Stop() error { // balanceUpdate is a type of external state update that reflects an // increase/decrease in the funds currently available to the wallet. type balanceUpdate struct { - balanceDelta btcutil.Amount } // nodeUpdates is a type of external state update that reflects an addition or @@ -214,13 +208,13 @@ type chanCloseUpdate struct { // OnBalanceChange is a callback that should be executed each time the balance of // the backing wallet changes. -func (a *Agent) OnBalanceChange(delta btcutil.Amount) { +func (a *Agent) OnBalanceChange() { a.wg.Add(1) go func() { defer a.wg.Done() select { - case a.stateUpdates <- &balanceUpdate{balanceDelta: delta}: + case a.stateUpdates <- &balanceUpdate{}: case <-a.quit: } }() @@ -342,12 +336,12 @@ func mergeChanState(pendingChans map[NodeID]Channel, // and external state changes as a result of decisions it makes w.r.t channel // allocation, or attributes affecting its control loop being updated by the // backing Lightning Node. -func (a *Agent) controller(startingBalance btcutil.Amount) { +func (a *Agent) controller() { defer a.wg.Done() // We'll start off by assigning our starting balance, and injecting // that amount as an initial wake up to the main controller goroutine. - a.OnBalanceChange(startingBalance) + a.OnBalanceChange() // TODO(roasbeef): do we in fact need to maintain order? // * use sync.Cond if so @@ -388,15 +382,16 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { // up an additional channel, or splice in funds to an // existing one. case *balanceUpdate: - log.Debugf("Applying external balance state "+ - "update of: %v", update.balanceDelta) + log.Debug("Applying external balance state " + + "update") - a.totalBalance += update.balanceDelta + updateBalance() // The channel we tried to open previously failed for // whatever reason. case *chanOpenFailureUpdate: - log.Debug("Retrying after previous channel open failure.") + log.Debug("Retrying after previous channel " + + "open failure.") updateBalance() diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index ff47c321..113aad42 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -527,7 +527,8 @@ func TestAgentBalanceUpdate(t *testing.T) { memGraph, _, _ := newMemChanGraph() // The wallet will start with 2 BTC available. - const walletBalance = btcutil.SatoshiPerBitcoin * 2 + var walletBalanceMtx sync.Mutex + walletBalance := btcutil.Amount(btcutil.SatoshiPerBitcoin * 2) // With the dependencies we created, we can now create the initial // agent itself. @@ -536,6 +537,8 @@ func TestAgentBalanceUpdate(t *testing.T) { Heuristic: heuristic, ChanController: chanController, WalletBalance: func() (btcutil.Amount, error) { + walletBalanceMtx.Lock() + defer walletBalanceMtx.Unlock() return walletBalance, nil }, ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { @@ -583,8 +586,11 @@ func TestAgentBalanceUpdate(t *testing.T) { // Next we'll send a new balance update signal to the agent, adding 5 // BTC to the amount of available funds. - const balanceDelta = btcutil.SatoshiPerBitcoin * 5 - agent.OnBalanceChange(balanceDelta) + walletBalanceMtx.Lock() + walletBalance += btcutil.SatoshiPerBitcoin * 5 + walletBalanceMtx.Unlock() + + agent.OnBalanceChange() wg = sync.WaitGroup{} @@ -597,11 +603,10 @@ func TestAgentBalanceUpdate(t *testing.T) { // At this point, the local state of the agent should // have also been updated to reflect that the LN node // now has an additional 5BTC available. - const expectedAmt = walletBalance + balanceDelta - if agent.totalBalance != expectedAmt { + if agent.totalBalance != walletBalance { t.Fatalf("expected %v wallet balance "+ "instead have %v", agent.totalBalance, - expectedAmt) + walletBalance) } // With all of our assertions passed, we'll signal the @@ -932,7 +937,8 @@ func TestAgentPendingChannelState(t *testing.T) { memGraph, _, _ := newMemChanGraph() // The wallet will start with 6 BTC available. - const walletBalance = btcutil.SatoshiPerBitcoin * 6 + var walletBalanceMtx sync.Mutex + walletBalance := btcutil.Amount(btcutil.SatoshiPerBitcoin * 6) // With the dependencies we created, we can now create the initial // agent itself. @@ -941,6 +947,9 @@ func TestAgentPendingChannelState(t *testing.T) { Heuristic: heuristic, ChanController: chanController, WalletBalance: func() (btcutil.Amount, error) { + walletBalanceMtx.Lock() + defer walletBalanceMtx.Unlock() + return walletBalance, nil }, ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { @@ -1039,7 +1048,11 @@ func TestAgentPendingChannelState(t *testing.T) { // Now, in order to test that the pending state was properly updated, // we'll trigger a balance update in order to trigger a query to the // heuristic. - agent.OnBalanceChange(0.4 * btcutil.SatoshiPerBitcoin) + walletBalanceMtx.Lock() + walletBalance += 0.4 * btcutil.SatoshiPerBitcoin + walletBalanceMtx.Unlock() + + agent.OnBalanceChange() // The heuristic should be queried, and the argument for the set of // channels passed in should include the pending channels that diff --git a/pilot.go b/pilot.go index 219702a7..c745082c 100644 --- a/pilot.go +++ b/pilot.go @@ -55,18 +55,6 @@ func (c *chanController) OpenChannel(target *btcec.PublicKey, updateStream, errChan := c.server.OpenChannel(req) select { case err := <-errChan: - // If we were not able to actually open a channel to the peer - // for whatever reason, then we'll disconnect from the peer to - // ensure we don't accumulate a bunch of unnecessary - // connections. - if err != nil { - dcErr := c.server.DisconnectPeer(target) - if dcErr != nil { - atplLog.Errorf("Unable to disconnect from peer %v", - target.SerializeCompressed()) - } - } - return err case <-updateStream: return nil @@ -172,8 +160,8 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) // If we weren't able to establish a connection at all, // then we'll error out. if !connected { - return false, fmt.Errorf("unable to connect "+ - "to %x", target.SerializeCompressed()) + return false, errors.New("exhausted all " + + "advertised addresses") } return false, nil @@ -224,8 +212,8 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) for { select { - case txnUpdate := <-txnSubscription.ConfirmedTransactions(): - pilot.OnBalanceChange(txnUpdate.Value) + case <-txnSubscription.ConfirmedTransactions(): + pilot.OnBalanceChange() case <-svr.quit: return }