autopilot: use updateBalance rather than tracking balance explicitly

In this commit, we modify the balanceUpdate autopilot signal to update
the balance according to what's returned to the WalletBalance callback
rather than explicitly tracking the balance. This gives the agent a
better sense of what the wallet's balance actually is.
This commit is contained in:
Wilmer Paulino 2018-08-28 19:17:14 -07:00
parent 2f1b024679
commit e1a376d9f8
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
3 changed files with 33 additions and 25 deletions

@ -154,13 +154,8 @@ func (a *Agent) Start() error {
log.Infof("Autopilot Agent starting") log.Infof("Autopilot Agent starting")
startingBalance, err := a.cfg.WalletBalance()
if err != nil {
return err
}
a.wg.Add(1) a.wg.Add(1)
go a.controller(startingBalance) go a.controller()
return nil return nil
} }
@ -183,7 +178,6 @@ func (a *Agent) Stop() error {
// balanceUpdate is a type of external state update that reflects an // balanceUpdate is a type of external state update that reflects an
// increase/decrease in the funds currently available to the wallet. // increase/decrease in the funds currently available to the wallet.
type balanceUpdate struct { type balanceUpdate struct {
balanceDelta btcutil.Amount
} }
// nodeUpdates is a type of external state update that reflects an addition or // nodeUpdates is a type of external state update that reflects an addition or
@ -214,13 +208,13 @@ type chanCloseUpdate struct {
// OnBalanceChange is a callback that should be executed each time the balance of // OnBalanceChange is a callback that should be executed each time the balance of
// the backing wallet changes. // the backing wallet changes.
func (a *Agent) OnBalanceChange(delta btcutil.Amount) { func (a *Agent) OnBalanceChange() {
a.wg.Add(1) a.wg.Add(1)
go func() { go func() {
defer a.wg.Done() defer a.wg.Done()
select { select {
case a.stateUpdates <- &balanceUpdate{balanceDelta: delta}: case a.stateUpdates <- &balanceUpdate{}:
case <-a.quit: case <-a.quit:
} }
}() }()
@ -342,12 +336,12 @@ func mergeChanState(pendingChans map[NodeID]Channel,
// and external state changes as a result of decisions it makes w.r.t channel // and external state changes as a result of decisions it makes w.r.t channel
// allocation, or attributes affecting its control loop being updated by the // allocation, or attributes affecting its control loop being updated by the
// backing Lightning Node. // backing Lightning Node.
func (a *Agent) controller(startingBalance btcutil.Amount) { func (a *Agent) controller() {
defer a.wg.Done() defer a.wg.Done()
// We'll start off by assigning our starting balance, and injecting // We'll start off by assigning our starting balance, and injecting
// that amount as an initial wake up to the main controller goroutine. // that amount as an initial wake up to the main controller goroutine.
a.OnBalanceChange(startingBalance) a.OnBalanceChange()
// TODO(roasbeef): do we in fact need to maintain order? // TODO(roasbeef): do we in fact need to maintain order?
// * use sync.Cond if so // * use sync.Cond if so
@ -388,15 +382,16 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
// up an additional channel, or splice in funds to an // up an additional channel, or splice in funds to an
// existing one. // existing one.
case *balanceUpdate: case *balanceUpdate:
log.Debugf("Applying external balance state "+ log.Debug("Applying external balance state " +
"update of: %v", update.balanceDelta) "update")
a.totalBalance += update.balanceDelta updateBalance()
// The channel we tried to open previously failed for // The channel we tried to open previously failed for
// whatever reason. // whatever reason.
case *chanOpenFailureUpdate: case *chanOpenFailureUpdate:
log.Debug("Retrying after previous channel open failure.") log.Debug("Retrying after previous channel " +
"open failure.")
updateBalance() updateBalance()

@ -527,7 +527,8 @@ func TestAgentBalanceUpdate(t *testing.T) {
memGraph, _, _ := newMemChanGraph() memGraph, _, _ := newMemChanGraph()
// The wallet will start with 2 BTC available. // The wallet will start with 2 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 2 var walletBalanceMtx sync.Mutex
walletBalance := btcutil.Amount(btcutil.SatoshiPerBitcoin * 2)
// With the dependencies we created, we can now create the initial // With the dependencies we created, we can now create the initial
// agent itself. // agent itself.
@ -536,6 +537,8 @@ func TestAgentBalanceUpdate(t *testing.T) {
Heuristic: heuristic, Heuristic: heuristic,
ChanController: chanController, ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
walletBalanceMtx.Lock()
defer walletBalanceMtx.Unlock()
return walletBalance, nil return walletBalance, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
@ -583,8 +586,11 @@ func TestAgentBalanceUpdate(t *testing.T) {
// Next we'll send a new balance update signal to the agent, adding 5 // Next we'll send a new balance update signal to the agent, adding 5
// BTC to the amount of available funds. // BTC to the amount of available funds.
const balanceDelta = btcutil.SatoshiPerBitcoin * 5 walletBalanceMtx.Lock()
agent.OnBalanceChange(balanceDelta) walletBalance += btcutil.SatoshiPerBitcoin * 5
walletBalanceMtx.Unlock()
agent.OnBalanceChange()
wg = sync.WaitGroup{} wg = sync.WaitGroup{}
@ -597,11 +603,10 @@ func TestAgentBalanceUpdate(t *testing.T) {
// At this point, the local state of the agent should // At this point, the local state of the agent should
// have also been updated to reflect that the LN node // have also been updated to reflect that the LN node
// now has an additional 5BTC available. // now has an additional 5BTC available.
const expectedAmt = walletBalance + balanceDelta if agent.totalBalance != walletBalance {
if agent.totalBalance != expectedAmt {
t.Fatalf("expected %v wallet balance "+ t.Fatalf("expected %v wallet balance "+
"instead have %v", agent.totalBalance, "instead have %v", agent.totalBalance,
expectedAmt) walletBalance)
} }
// With all of our assertions passed, we'll signal the // With all of our assertions passed, we'll signal the
@ -932,7 +937,8 @@ func TestAgentPendingChannelState(t *testing.T) {
memGraph, _, _ := newMemChanGraph() memGraph, _, _ := newMemChanGraph()
// The wallet will start with 6 BTC available. // The wallet will start with 6 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 6 var walletBalanceMtx sync.Mutex
walletBalance := btcutil.Amount(btcutil.SatoshiPerBitcoin * 6)
// With the dependencies we created, we can now create the initial // With the dependencies we created, we can now create the initial
// agent itself. // agent itself.
@ -941,6 +947,9 @@ func TestAgentPendingChannelState(t *testing.T) {
Heuristic: heuristic, Heuristic: heuristic,
ChanController: chanController, ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
walletBalanceMtx.Lock()
defer walletBalanceMtx.Unlock()
return walletBalance, nil return walletBalance, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
@ -1039,7 +1048,11 @@ func TestAgentPendingChannelState(t *testing.T) {
// Now, in order to test that the pending state was properly updated, // Now, in order to test that the pending state was properly updated,
// we'll trigger a balance update in order to trigger a query to the // we'll trigger a balance update in order to trigger a query to the
// heuristic. // heuristic.
agent.OnBalanceChange(0.4 * btcutil.SatoshiPerBitcoin) walletBalanceMtx.Lock()
walletBalance += 0.4 * btcutil.SatoshiPerBitcoin
walletBalanceMtx.Unlock()
agent.OnBalanceChange()
// The heuristic should be queried, and the argument for the set of // The heuristic should be queried, and the argument for the set of
// channels passed in should include the pending channels that // channels passed in should include the pending channels that

@ -224,8 +224,8 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error)
for { for {
select { select {
case txnUpdate := <-txnSubscription.ConfirmedTransactions(): case <-txnSubscription.ConfirmedTransactions():
pilot.OnBalanceChange(txnUpdate.Value) pilot.OnBalanceChange()
case <-svr.quit: case <-svr.quit:
return return
} }