cnct: extract relaunching of resolvers to method

Co-authored-by: Joost Jager <joost.jager@gmail.com>
This commit is contained in:
Wilmer Paulino 2019-01-22 20:45:26 -08:00
parent cebc4d8dba
commit 865f7568d6
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F

@ -243,8 +243,7 @@ func (c *ChannelArbitrator) Start() error {
} }
var ( var (
err error err error
unresolvedContracts []ContractResolver
) )
log.Debugf("Starting ChannelArbitrator(%v), htlc_set=%v", log.Debugf("Starting ChannelArbitrator(%v), htlc_set=%v",
@ -332,23 +331,10 @@ func (c *ChannelArbitrator) Start() error {
if startingState == StateWaitingFullResolution && if startingState == StateWaitingFullResolution &&
nextState == StateWaitingFullResolution { nextState == StateWaitingFullResolution {
// We'll now query our log to see if there are any active if err := c.relaunchResolvers(); err != nil {
// unresolved contracts. If this is the case, then we'll
// relaunch all contract resolvers.
unresolvedContracts, err = c.log.FetchUnresolvedContracts()
if err != nil {
c.cfg.BlockEpochs.Cancel() c.cfg.BlockEpochs.Cancel()
return err return err
} }
log.Infof("ChannelArbitrator(%v): relaunching %v contract "+
"resolvers", c.cfg.ChanPoint, len(unresolvedContracts))
c.activeResolvers = unresolvedContracts
for _, contract := range unresolvedContracts {
c.wg.Add(1)
go c.resolveContract(contract)
}
} }
// TODO(roasbeef): cancel if breached // TODO(roasbeef): cancel if breached
@ -358,6 +344,28 @@ func (c *ChannelArbitrator) Start() error {
return nil return nil
} }
// relauchResolvers relaunches the set of resolvers for unresolved contracts in
// order to provide them with information that's not immediately available upon
// starting the ChannelArbitrator. This information should ideally be stored in
// the database, so this only serves as a intermediate work-around to prevent a
// migration.
func (c *ChannelArbitrator) relaunchResolvers() error {
// We'll now query our log to see if there are any active
// unresolved contracts. If this is the case, then we'll
// relaunch all contract resolvers.
unresolvedContracts, err := c.log.FetchUnresolvedContracts()
if err != nil {
return err
}
log.Infof("ChannelArbitrator(%v): relaunching %v contract "+
"resolvers", c.cfg.ChanPoint, len(unresolvedContracts))
c.launchResolvers(unresolvedContracts)
return nil
}
// Stop signals the ChannelArbitrator for a graceful shutdown. // Stop signals the ChannelArbitrator for a graceful shutdown.
func (c *ChannelArbitrator) Stop() error { func (c *ChannelArbitrator) Stop() error {
if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) { if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
@ -703,11 +711,7 @@ func (c *ChannelArbitrator) stateStep(triggerHeight uint32,
// Finally, we'll launch all the required contract resolvers. // Finally, we'll launch all the required contract resolvers.
// Once they're all resolved, we're no longer needed. // Once they're all resolved, we're no longer needed.
c.activeResolvers = htlcResolvers c.launchResolvers(htlcResolvers)
for _, contract := range htlcResolvers {
c.wg.Add(1)
go c.resolveContract(contract)
}
nextState = StateWaitingFullResolution nextState = StateWaitingFullResolution
@ -741,6 +745,15 @@ func (c *ChannelArbitrator) stateStep(triggerHeight uint32,
return nextState, closeTx, nil return nextState, closeTx, nil
} }
// launchResolvers updates the activeResolvers list and starts the resolvers.
func (c *ChannelArbitrator) launchResolvers(resolvers []ContractResolver) {
c.activeResolvers = resolvers
for _, contract := range resolvers {
c.wg.Add(1)
go c.resolveContract(contract)
}
}
// advanceState is the main driver of our state machine. This method is an // advanceState is the main driver of our state machine. This method is an
// iterative function which repeatedly attempts to advance the internal state // iterative function which repeatedly attempts to advance the internal state
// of the channel arbitrator. The state will be advanced until we reach a // of the channel arbitrator. The state will be advanced until we reach a