multi: add resolver reports to Checkpoint
To allow us to write the outcome of our resolver to disk, we add optional resolver reports to the CheckPoint function. Variadic params are used because some checkpoints may have no reports (when the resolver is not yet complete) and some may have two (in the case of a two stage resolution).
This commit is contained in:
parent
8c8f857f60
commit
fa46db9c48
@ -62,8 +62,10 @@ type ArbitratorLog interface {
|
|||||||
|
|
||||||
// InsertUnresolvedContracts inserts a set of unresolved contracts into
|
// InsertUnresolvedContracts inserts a set of unresolved contracts into
|
||||||
// the log. The log will then persistently store each contract until
|
// the log. The log will then persistently store each contract until
|
||||||
// they've been swapped out, or resolved.
|
// they've been swapped out, or resolved. It takes a set of report which
|
||||||
InsertUnresolvedContracts(...ContractResolver) error
|
// should be written to disk if as well if it is non-nil.
|
||||||
|
InsertUnresolvedContracts(reports []*channeldb.ResolverReport,
|
||||||
|
resolvers ...ContractResolver) error
|
||||||
|
|
||||||
// FetchUnresolvedContracts returns all unresolved contracts that have
|
// FetchUnresolvedContracts returns all unresolved contracts that have
|
||||||
// been previously written to the log.
|
// been previously written to the log.
|
||||||
@ -533,7 +535,9 @@ func (b *boltArbitratorLog) FetchUnresolvedContracts() ([]ContractResolver, erro
|
|||||||
// swapped out, or resolved.
|
// swapped out, or resolved.
|
||||||
//
|
//
|
||||||
// NOTE: Part of the ContractResolver interface.
|
// NOTE: Part of the ContractResolver interface.
|
||||||
func (b *boltArbitratorLog) InsertUnresolvedContracts(resolvers ...ContractResolver) error {
|
func (b *boltArbitratorLog) InsertUnresolvedContracts(reports []*channeldb.ResolverReport,
|
||||||
|
resolvers ...ContractResolver) error {
|
||||||
|
|
||||||
return kvdb.Batch(b.db, func(tx kvdb.RwTx) error {
|
return kvdb.Batch(b.db, func(tx kvdb.RwTx) error {
|
||||||
contractBucket, err := fetchContractWriteBucket(tx, b.scopeKey[:])
|
contractBucket, err := fetchContractWriteBucket(tx, b.scopeKey[:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -547,6 +551,14 @@ func (b *boltArbitratorLog) InsertUnresolvedContracts(resolvers ...ContractResol
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Persist any reports that are present.
|
||||||
|
for _, report := range reports {
|
||||||
|
err := b.cfg.PutResolverReport(tx, report)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -908,15 +920,28 @@ func (b *boltArbitratorLog) WipeHistory() error {
|
|||||||
|
|
||||||
// checkpointContract is a private method that will be fed into
|
// checkpointContract is a private method that will be fed into
|
||||||
// ContractResolver instances to checkpoint their state once they reach
|
// ContractResolver instances to checkpoint their state once they reach
|
||||||
// milestones during contract resolution.
|
// milestones during contract resolution. If the report provided is non-nil,
|
||||||
func (b *boltArbitratorLog) checkpointContract(c ContractResolver) error {
|
// it should also be recorded.
|
||||||
|
func (b *boltArbitratorLog) checkpointContract(c ContractResolver,
|
||||||
|
reports ...*channeldb.ResolverReport) error {
|
||||||
|
|
||||||
return kvdb.Update(b.db, func(tx kvdb.RwTx) error {
|
return kvdb.Update(b.db, func(tx kvdb.RwTx) error {
|
||||||
contractBucket, err := fetchContractWriteBucket(tx, b.scopeKey[:])
|
contractBucket, err := fetchContractWriteBucket(tx, b.scopeKey[:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.writeResolver(contractBucket, c)
|
if err := b.writeResolver(contractBucket, c); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, report := range reports {
|
||||||
|
if err := b.cfg.PutResolverReport(tx, report); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -338,8 +338,10 @@ func TestContractInsertionRetrieval(t *testing.T) {
|
|||||||
resolverMap[string(resolvers[3].ResolverKey())] = resolvers[3]
|
resolverMap[string(resolvers[3].ResolverKey())] = resolvers[3]
|
||||||
resolverMap[string(resolvers[4].ResolverKey())] = resolvers[4]
|
resolverMap[string(resolvers[4].ResolverKey())] = resolvers[4]
|
||||||
|
|
||||||
// Now, we'll insert the resolver into the log.
|
// Now, we'll insert the resolver into the log, we do not need to apply
|
||||||
if err := testLog.InsertUnresolvedContracts(resolvers...); err != nil {
|
// any closures, so we will pass in nil.
|
||||||
|
err = testLog.InsertUnresolvedContracts(nil, resolvers...)
|
||||||
|
if err != nil {
|
||||||
t.Fatalf("unable to insert resolvers: %v", err)
|
t.Fatalf("unable to insert resolvers: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -419,8 +421,9 @@ func TestContractResolution(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// First, we'll insert the resolver into the database and ensure that
|
// First, we'll insert the resolver into the database and ensure that
|
||||||
// we get the same resolver out the other side.
|
// we get the same resolver out the other side. We do not need to apply
|
||||||
err = testLog.InsertUnresolvedContracts(timeoutResolver)
|
// any closures.
|
||||||
|
err = testLog.InsertUnresolvedContracts(nil, timeoutResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to insert contract into db: %v", err)
|
t.Fatalf("unable to insert contract into db: %v", err)
|
||||||
}
|
}
|
||||||
@ -482,8 +485,9 @@ func TestContractSwapping(t *testing.T) {
|
|||||||
htlcTimeoutResolver: timeoutResolver,
|
htlcTimeoutResolver: timeoutResolver,
|
||||||
}
|
}
|
||||||
|
|
||||||
// We'll first insert the contest resolver into the log.
|
// We'll first insert the contest resolver into the log with no
|
||||||
err = testLog.InsertUnresolvedContracts(contestResolver)
|
// additional updates.
|
||||||
|
err = testLog.InsertUnresolvedContracts(nil, contestResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to insert contract into db: %v", err)
|
t.Fatalf("unable to insert contract into db: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -976,7 +976,7 @@ func (c *ChannelArbitrator) stateStep(
|
|||||||
log.Debugf("ChannelArbitrator(%v): inserting %v contract "+
|
log.Debugf("ChannelArbitrator(%v): inserting %v contract "+
|
||||||
"resolvers", c.cfg.ChanPoint, len(htlcResolvers))
|
"resolvers", c.cfg.ChanPoint, len(htlcResolvers))
|
||||||
|
|
||||||
err = c.log.InsertUnresolvedContracts(htlcResolvers...)
|
err = c.log.InsertUnresolvedContracts(nil, htlcResolvers...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return StateError, closeTx, err
|
return StateError, closeTx, err
|
||||||
}
|
}
|
||||||
@ -1744,8 +1744,10 @@ func (c *ChannelArbitrator) prepContractResolutions(
|
|||||||
// resolver so they each can do their duty.
|
// resolver so they each can do their duty.
|
||||||
resolverCfg := ResolverConfig{
|
resolverCfg := ResolverConfig{
|
||||||
ChannelArbitratorConfig: c.cfg,
|
ChannelArbitratorConfig: c.cfg,
|
||||||
Checkpoint: func(res ContractResolver) error {
|
Checkpoint: func(res ContractResolver,
|
||||||
return c.log.InsertUnresolvedContracts(res)
|
reports ...*channeldb.ResolverReport) error {
|
||||||
|
|
||||||
|
return c.log.InsertUnresolvedContracts(reports, res)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,7 +78,7 @@ func (b *mockArbitratorLog) FetchUnresolvedContracts() ([]ContractResolver,
|
|||||||
return v, nil
|
return v, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *mockArbitratorLog) InsertUnresolvedContracts(
|
func (b *mockArbitratorLog) InsertUnresolvedContracts(_ []*channeldb.ResolverReport,
|
||||||
resolvers ...ContractResolver) error {
|
resolvers ...ContractResolver) error {
|
||||||
|
|
||||||
b.Lock()
|
b.Lock()
|
||||||
|
@ -277,7 +277,7 @@ func (c *commitSweepResolver) Resolve() (ContractResolver, error) {
|
|||||||
c.reportLock.Unlock()
|
c.reportLock.Unlock()
|
||||||
|
|
||||||
c.resolved = true
|
c.resolved = true
|
||||||
return nil, c.Checkpoint(c)
|
return nil, c.Checkpoint(c, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop signals the resolver to cancel any current resolution processes, and
|
// Stop signals the resolver to cancel any current resolution processes, and
|
||||||
|
@ -50,7 +50,9 @@ func newCommitSweepResolverTestContext(t *testing.T,
|
|||||||
|
|
||||||
cfg := ResolverConfig{
|
cfg := ResolverConfig{
|
||||||
ChannelArbitratorConfig: chainCfg,
|
ChannelArbitratorConfig: chainCfg,
|
||||||
Checkpoint: func(_ ContractResolver) error {
|
Checkpoint: func(_ ContractResolver,
|
||||||
|
_ ...*channeldb.ResolverReport) error {
|
||||||
|
|
||||||
checkPointChan <- struct{}{}
|
checkPointChan <- struct{}{}
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
@ -86,8 +86,10 @@ type ResolverConfig struct {
|
|||||||
|
|
||||||
// Checkpoint allows a resolver to check point its state. This function
|
// Checkpoint allows a resolver to check point its state. This function
|
||||||
// should write the state of the resolver to persistent storage, and
|
// should write the state of the resolver to persistent storage, and
|
||||||
// return a non-nil error upon success.
|
// return a non-nil error upon success. It takes a resolver report,
|
||||||
Checkpoint func(ContractResolver) error
|
// which contains information about the outcome and should be written
|
||||||
|
// to disk if non-nil.
|
||||||
|
Checkpoint func(ContractResolver, ...*channeldb.ResolverReport) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// contractResolverKit is meant to be used as a mix-in struct to be embedded within a
|
// contractResolverKit is meant to be used as a mix-in struct to be embedded within a
|
||||||
|
@ -260,7 +260,9 @@ func newIncomingResolverTestContext(t *testing.T) *incomingResolverTestContext {
|
|||||||
|
|
||||||
cfg := ResolverConfig{
|
cfg := ResolverConfig{
|
||||||
ChannelArbitratorConfig: chainCfg,
|
ChannelArbitratorConfig: chainCfg,
|
||||||
Checkpoint: func(_ ContractResolver) error {
|
Checkpoint: func(_ ContractResolver,
|
||||||
|
_ ...*channeldb.ResolverReport) error {
|
||||||
|
|
||||||
checkPointChan <- struct{}{}
|
checkPointChan <- struct{}{}
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
@ -134,7 +134,9 @@ func newOutgoingResolverTestContext(t *testing.T) *outgoingResolverTestContext {
|
|||||||
|
|
||||||
cfg := ResolverConfig{
|
cfg := ResolverConfig{
|
||||||
ChannelArbitratorConfig: chainCfg,
|
ChannelArbitratorConfig: chainCfg,
|
||||||
Checkpoint: func(_ ContractResolver) error {
|
Checkpoint: func(_ ContractResolver,
|
||||||
|
_ ...*channeldb.ResolverReport) error {
|
||||||
|
|
||||||
checkPointChan <- struct{}{}
|
checkPointChan <- struct{}{}
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
@ -259,7 +259,9 @@ func TestHtlcTimeoutResolver(t *testing.T) {
|
|||||||
|
|
||||||
cfg := ResolverConfig{
|
cfg := ResolverConfig{
|
||||||
ChannelArbitratorConfig: chainCfg,
|
ChannelArbitratorConfig: chainCfg,
|
||||||
Checkpoint: func(_ ContractResolver) error {
|
Checkpoint: func(_ ContractResolver,
|
||||||
|
_ ...*channeldb.ResolverReport) error {
|
||||||
|
|
||||||
checkPointChan <- struct{}{}
|
checkPointChan <- struct{}{}
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
Loading…
Reference in New Issue
Block a user