routing: persist mission control data

This commit is contained in:
Joost Jager 2019-06-26 13:00:35 +02:00
parent 3dc83d1b6b
commit 7e7b620355
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7
14 changed files with 566 additions and 28 deletions

View File

@ -24,4 +24,8 @@ type RoutingConfig struct {
// executing a payment attempt that fails. It is used to trade off
// potentially better routes against their probability of succeeding.
AttemptCost btcutil.Amount `long:"attemptcost" description:"The (virtual) cost in sats of a failed payment attempt"`
// MaxMcHistory defines the maximum number of payment results that
// are held on disk by mission control.
MaxMcHistory int `long:"maxmchistory" description:"the maximum number of payment results that are held on disk by mission control"`
}

View File

@ -49,6 +49,7 @@ func DefaultConfig() *Config {
PenaltyHalfLife: routing.DefaultPenaltyHalfLife,
AttemptCost: routing.DefaultPaymentAttemptPenalty.
ToSatoshis(),
MaxMcHistory: routing.DefaultMaxMcHistory,
}
return &Config{
@ -63,5 +64,6 @@ func GetRoutingConfig(cfg *Config) *RoutingConfig {
MinRouteProbability: cfg.MinRouteProbability,
AttemptCost: cfg.AttemptCost,
PenaltyHalfLife: cfg.PenaltyHalfLife,
MaxMcHistory: cfg.MaxMcHistory,
}
}

View File

@ -22,5 +22,6 @@ func GetRoutingConfig(cfg *Config) *RoutingConfig {
AttemptCost: routing.DefaultPaymentAttemptPenalty.
ToSatoshis(),
PenaltyHalfLife: routing.DefaultPenaltyHalfLife,
MaxMcHistory: routing.DefaultMaxMcHistory,
}
}

View File

@ -64,7 +64,7 @@ type MissionControl interface {
// ResetHistory resets the history of MissionControl returning it to a state as
// if no payment attempts have been made.
ResetHistory()
ResetHistory() error
// GetHistorySnapshot takes a snapshot from the current mission control state
// and actual probability estimates.

View File

@ -147,7 +147,9 @@ func (m *mockMissionControl) GetEdgeProbability(fromNode route.Vertex,
return testMissionControlProb
}
func (m *mockMissionControl) ResetHistory() {}
func (m *mockMissionControl) ResetHistory() error {
return nil
}
func (m *mockMissionControl) GetHistorySnapshot() *routing.MissionControlSnapshot {
return nil

View File

@ -436,7 +436,10 @@ func marshallChannelUpdate(update *lnwire.ChannelUpdate) *ChannelUpdate {
func (s *Server) ResetMissionControl(ctx context.Context,
req *ResetMissionControlRequest) (*ResetMissionControlResponse, error) {
s.cfg.RouterBackend.MissionControl.ResetHistory()
err := s.cfg.RouterBackend.MissionControl.ResetHistory()
if err != nil {
return nil, err
}
return &ResetMissionControlResponse{}, nil
}

View File

@ -5,6 +5,7 @@ import (
"sync"
"time"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
@ -39,6 +40,9 @@ const (
// Nodes forward non-strict, so it isn't necessary to apply a less
// restrictive channel level tracking scheme here.
minSecondChanceInterval = time.Minute
// DefaultMaxMcHistory is the default maximum history size.
DefaultMaxMcHistory = 1000
)
// MissionControl contains state which summarizes the past attempts of HTLC
@ -63,6 +67,8 @@ type MissionControl struct {
cfg *MissionControlConfig
store *missionControlStore
sync.Mutex
// TODO(roasbeef): further counters, if vertex continually unavailable,
@ -81,6 +87,10 @@ type MissionControlConfig struct {
// AprioriHopProbability is the assumed success probability of a hop in
// a route when no other information is available.
AprioriHopProbability float64
// MaxMcHistory defines the maximum number of payment results that are
// held on disk.
MaxMcHistory int
}
// nodeHistory contains a summary of payment attempt outcomes involving a
@ -159,29 +169,70 @@ type paymentResult struct {
}
// NewMissionControl returns a new instance of missionControl.
func NewMissionControl(cfg *MissionControlConfig) *MissionControl {
func NewMissionControl(db *bbolt.DB, cfg *MissionControlConfig) (
*MissionControl, error) {
log.Debugf("Instantiating mission control with config: "+
"PenaltyHalfLife=%v, AprioriHopProbability=%v",
cfg.PenaltyHalfLife, cfg.AprioriHopProbability)
return &MissionControl{
store, err := newMissionControlStore(db, cfg.MaxMcHistory)
if err != nil {
return nil, err
}
mc := &MissionControl{
history: make(map[route.Vertex]*nodeHistory),
lastSecondChance: make(map[DirectedNodePair]time.Time),
now: time.Now,
cfg: cfg,
store: store,
}
if err := mc.init(); err != nil {
return nil, err
}
return mc, nil
}
// init initializes mission control with historical data.
func (m *MissionControl) init() error {
log.Debugf("Mission control state reconstruction started")
start := time.Now()
results, err := m.store.fetchAll()
if err != nil {
return err
}
for _, result := range results {
m.applyPaymentResult(result)
}
log.Debugf("Mission control state reconstruction finished: "+
"n=%v, time=%v", len(results), time.Now().Sub(start))
return nil
}
// ResetHistory resets the history of MissionControl returning it to a state as
// if no payment attempts have been made.
func (m *MissionControl) ResetHistory() {
func (m *MissionControl) ResetHistory() error {
m.Lock()
defer m.Unlock()
if err := m.store.clear(); err != nil {
return err
}
m.history = make(map[route.Vertex]*nodeHistory)
m.lastSecondChance = make(map[DirectedNodePair]time.Time)
log.Debugf("Mission control history cleared")
return nil
}
// GetEdgeProbability is expected to return the success probability of a payment
@ -406,7 +457,7 @@ func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
// be made.
func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
failureSourceIdx *int, failure lnwire.FailureMessage) (bool,
channeldb.FailureReason) {
channeldb.FailureReason, error) {
timestamp := m.now()
@ -421,8 +472,15 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
route: rt,
}
// Store complete result in database.
if err := m.store.AddResult(result); err != nil {
return false, 0, err
}
// Apply result to update mission control state.
return m.applyPaymentResult(result)
final, reason := m.applyPaymentResult(result)
return final, reason, nil
}
// applyPaymentResult applies a payment result as input for future probability

View File

@ -0,0 +1,269 @@
package routing
import (
"bytes"
"encoding/binary"
"fmt"
"time"
"github.com/btcsuite/btcd/wire"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
// resultsKey is the fixed key under which the attempt results are
// stored.
resultsKey = []byte("missioncontrol-results")
// Big endian is the preferred byte order, due to cursor scans over
// integer keys iterating in order.
byteOrder = binary.BigEndian
)
const (
// unknownFailureSourceIdx is the database encoding of an unknown error
// source.
unknownFailureSourceIdx = -1
)
// missionControlStore is a bolt db based implementation of a mission control
// store. It stores the raw payment attempt data from which the internal mission
// controls state can be rederived on startup. This allows the mission control
// internal data structure to be changed without requiring a database migration.
// Also changes to mission control parameters can be applied to historical data.
// Finally, it enables importing raw data from an external source.
type missionControlStore struct {
db *bbolt.DB
maxRecords int
numRecords int
}
func newMissionControlStore(db *bbolt.DB, maxRecords int) (*missionControlStore, error) {
store := &missionControlStore{
db: db,
maxRecords: maxRecords,
}
// Create buckets if not yet existing.
err := db.Update(func(tx *bbolt.Tx) error {
resultsBucket, err := tx.CreateBucketIfNotExists(resultsKey)
if err != nil {
return fmt.Errorf("cannot create results bucket: %v",
err)
}
// Count initial number of results and track this number in
// memory to avoid calling Stats().KeyN. The reliability of
// Stats() is doubtful and seemed to have caused crashes in the
// past (see #1874).
c := resultsBucket.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
store.numRecords++
}
return nil
})
if err != nil {
return nil, err
}
return store, nil
}
// clear removes all results from the db.
func (b *missionControlStore) clear() error {
return b.db.Update(func(tx *bbolt.Tx) error {
if err := tx.DeleteBucket(resultsKey); err != nil {
return err
}
_, err := tx.CreateBucket(resultsKey)
return err
})
}
// fetchAll returns all results currently stored in the database.
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
var results []*paymentResult
err := b.db.View(func(tx *bbolt.Tx) error {
resultBucket := tx.Bucket(resultsKey)
results = make([]*paymentResult, 0)
return resultBucket.ForEach(func(k, v []byte) error {
result, err := deserializeResult(k, v)
if err != nil {
return err
}
results = append(results, result)
return nil
})
})
if err != nil {
return nil, err
}
return results, nil
}
// serializeResult serializes a payment result and returns a key and value byte
// slice to insert into the bucket.
func serializeResult(rp *paymentResult) ([]byte, []byte, error) {
// Write timestamps, success status, failure source index and route.
var b bytes.Buffer
var dbFailureSourceIdx int32
if rp.failureSourceIdx == nil {
dbFailureSourceIdx = unknownFailureSourceIdx
} else {
dbFailureSourceIdx = int32(*rp.failureSourceIdx)
}
err := channeldb.WriteElements(
&b,
uint64(rp.timeFwd.UnixNano()),
uint64(rp.timeReply.UnixNano()),
rp.success, dbFailureSourceIdx,
)
if err != nil {
return nil, nil, err
}
if err := channeldb.SerializeRoute(&b, *rp.route); err != nil {
return nil, nil, err
}
// Write failure. If there is no failure message, write an empty
// byte slice.
var failureBytes bytes.Buffer
if rp.failure != nil {
err := lnwire.EncodeFailureMessage(&failureBytes, rp.failure, 0)
if err != nil {
return nil, nil, err
}
}
err = wire.WriteVarBytes(&b, 0, failureBytes.Bytes())
if err != nil {
return nil, nil, err
}
// Compose key that identifies this result.
key := getResultKey(rp)
return key, b.Bytes(), nil
}
// deserializeResult deserializes a payment result.
func deserializeResult(k, v []byte) (*paymentResult, error) {
// Parse payment id.
result := paymentResult{
id: byteOrder.Uint64(k[8:]),
}
r := bytes.NewReader(v)
// Read timestamps, success status and failure source index.
var (
timeFwd, timeReply uint64
dbFailureSourceIdx int32
)
err := channeldb.ReadElements(
r, &timeFwd, &timeReply, &result.success, &dbFailureSourceIdx,
)
if err != nil {
return nil, err
}
// Convert time stamps to local time zone for consistent logging.
result.timeFwd = time.Unix(0, int64(timeFwd)).Local()
result.timeReply = time.Unix(0, int64(timeReply)).Local()
// Convert from unknown index magic number to nil value.
if dbFailureSourceIdx != unknownFailureSourceIdx {
failureSourceIdx := int(dbFailureSourceIdx)
result.failureSourceIdx = &failureSourceIdx
}
// Read route.
route, err := channeldb.DeserializeRoute(r)
if err != nil {
return nil, err
}
result.route = &route
// Read failure.
failureBytes, err := wire.ReadVarBytes(
r, 0, lnwire.FailureMessageLength, "failure",
)
if err != nil {
return nil, err
}
if len(failureBytes) > 0 {
result.failure, err = lnwire.DecodeFailureMessage(
bytes.NewReader(failureBytes), 0,
)
if err != nil {
return nil, err
}
}
return &result, nil
}
// AddResult adds a new result to the db.
func (b *missionControlStore) AddResult(rp *paymentResult) error {
return b.db.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(resultsKey)
// Prune oldest entries.
if b.maxRecords > 0 {
for b.numRecords >= b.maxRecords {
cursor := bucket.Cursor()
cursor.First()
if err := cursor.Delete(); err != nil {
return err
}
b.numRecords--
}
}
// Serialize result into key and value byte slices.
k, v, err := serializeResult(rp)
if err != nil {
return err
}
// The store is assumed to be idempotent. It could be that the
// same result is added twice and in that case the counter
// shouldn't be increased.
if bucket.Get(k) == nil {
b.numRecords++
}
// Put into results bucket.
return bucket.Put(k, v)
})
}
// getResultKey returns a byte slice representing a unique key for this payment
// result.
func getResultKey(rp *paymentResult) []byte {
var keyBytes [8 + 8 + 33]byte
// Identify records by a combination of time, payment id and sender pub
// key. This allows importing mission control data from an external
// source without key collisions and keeps the records sorted
// chronologically.
byteOrder.PutUint64(keyBytes[:], uint64(rp.timeReply.UnixNano()))
byteOrder.PutUint64(keyBytes[8:], rp.id)
copy(keyBytes[16:], rp.route.SourcePubKey[:])
return keyBytes[:]
}

View File

@ -0,0 +1,140 @@
package routing
import (
"io/ioutil"
"os"
"reflect"
"testing"
"time"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/routing/route"
)
const testMaxRecords = 2
func TestMissionControlStore(t *testing.T) {
// Set time zone explictly to keep test deterministic.
time.Local = time.UTC
file, err := ioutil.TempFile("", "*.db")
if err != nil {
t.Fatal(err)
}
dbPath := file.Name()
db, err := bbolt.Open(dbPath, 0600, nil)
if err != nil {
t.Fatal(err)
}
defer db.Close()
defer os.Remove(dbPath)
store, err := newMissionControlStore(db, testMaxRecords)
if err != nil {
t.Fatal(err)
}
results, err := store.fetchAll()
if err != nil {
t.Fatal(err)
}
if len(results) != 0 {
t.Fatal("expected no results")
}
testRoute := route.Route{
SourcePubKey: route.Vertex{1},
Hops: []*route.Hop{
{
PubKeyBytes: route.Vertex{2},
},
},
}
failureSourceIdx := 1
result1 := paymentResult{
route: &testRoute,
failure: lnwire.NewFailUnknownPaymentHash(100),
failureSourceIdx: &failureSourceIdx,
id: 99,
timeReply: testTime,
timeFwd: testTime.Add(-time.Minute),
}
result2 := result1
result2.timeReply = result1.timeReply.Add(time.Hour)
result2.timeFwd = result1.timeReply.Add(time.Hour)
result2.id = 2
// Store result.
err = store.AddResult(&result2)
if err != nil {
t.Fatal(err)
}
// Store again to test idempotency.
err = store.AddResult(&result2)
if err != nil {
t.Fatal(err)
}
// Store second result which has an earlier timestamp.
err = store.AddResult(&result1)
if err != nil {
t.Fatal(err)
}
results, err = store.fetchAll()
if err != nil {
t.Fatal(err)
}
if len(results) != 2 {
t.Fatal("expected two results")
}
// Check that results are stored in chronological order.
if !reflect.DeepEqual(&result1, results[0]) {
t.Fatal()
}
if !reflect.DeepEqual(&result2, results[1]) {
t.Fatal()
}
// Recreate store to test pruning.
store, err = newMissionControlStore(db, testMaxRecords)
if err != nil {
t.Fatal(err)
}
// Add a newer result.
result3 := result1
result3.timeReply = result1.timeReply.Add(2 * time.Hour)
result3.timeFwd = result1.timeReply.Add(2 * time.Hour)
result3.id = 3
err = store.AddResult(&result3)
if err != nil {
t.Fatal(err)
}
// Check that results are pruned.
results, err = store.fetchAll()
if err != nil {
t.Fatal(err)
}
if len(results) != 2 {
t.Fatal("expected two results")
}
if !reflect.DeepEqual(&result2, results[0]) {
t.Fatal()
}
if !reflect.DeepEqual(&result3, results[1]) {
t.Fatal()
}
}

View File

@ -1,9 +1,12 @@
package routing
import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
@ -37,6 +40,10 @@ type mcTestContext struct {
t *testing.T
mc *MissionControl
now time.Time
db *bbolt.DB
dbPath string
pid uint64
}
@ -46,17 +53,44 @@ func createMcTestContext(t *testing.T) *mcTestContext {
now: mcTestTime,
}
mc := NewMissionControl(
file, err := ioutil.TempFile("", "*.db")
if err != nil {
t.Fatal(err)
}
ctx.dbPath = file.Name()
ctx.db, err = bbolt.Open(ctx.dbPath, 0600, nil)
if err != nil {
t.Fatal(err)
}
ctx.restartMc()
return ctx
}
// restartMc creates a new instances of mission control on the same database.
func (ctx *mcTestContext) restartMc() {
mc, err := NewMissionControl(
ctx.db,
&MissionControlConfig{
PenaltyHalfLife: 30 * time.Minute,
AprioriHopProbability: 0.8,
},
)
if err != nil {
ctx.t.Fatal(err)
}
mc.now = func() time.Time { return ctx.now }
ctx.mc = mc
}
return ctx
// cleanup closes the database and removes the temp file.
func (ctx *mcTestContext) cleanup() {
ctx.db.Close()
os.Remove(ctx.dbPath)
}
// Assert that mission control returns a probability for an edge.
@ -86,6 +120,7 @@ func (ctx *mcTestContext) reportFailure(t time.Time,
// TestMissionControl tests mission control probability estimation.
func TestMissionControl(t *testing.T) {
ctx := createMcTestContext(t)
defer ctx.cleanup()
ctx.now = testTime
@ -122,6 +157,10 @@ func TestMissionControl(t *testing.T) {
ctx.now = testTime.Add(60 * time.Minute)
ctx.expectP(1000, 0.4)
// Restart mission control to test persistence.
ctx.restartMc()
ctx.expectP(1000, 0.4)
// A node level failure should bring probability of every channel back
// to zero.
ctx.reportFailure(
@ -145,6 +184,7 @@ func TestMissionControl(t *testing.T) {
// penalizing the channel yet.
func TestMissionControlChannelUpdate(t *testing.T) {
ctx := createMcTestContext(t)
defer ctx.cleanup()
// Report a policy related failure. Because it is the first, we don't
// expect a penalty.

View File

@ -100,9 +100,9 @@ var _ MissionController = (*mockMissionControl)(nil)
func (m *mockMissionControl) ReportPaymentFail(paymentID uint64,
rt *route.Route, failureSourceIdx *int, failure lnwire.FailureMessage) (
bool, channeldb.FailureReason) {
bool, channeldb.FailureReason, error) {
return false, 0
return false, 0, nil
}
func (m *mockMissionControl) ReportEdgeFailure(failedEdge edge,

View File

@ -180,7 +180,7 @@ type MissionController interface {
// need to be made.
ReportPaymentFail(paymentID uint64, rt *route.Route,
failureSourceIdx *int, failure lnwire.FailureMessage) (bool,
channeldb.FailureReason)
channeldb.FailureReason, error)
// GetEdgeProbability is expected to return the success probability of a
// payment from fromNode along edge.
@ -1896,12 +1896,27 @@ func (r *ChannelRouter) tryApplyChannelUpdate(rt *route.Route,
func (r *ChannelRouter) processSendError(paymentID uint64, rt *route.Route,
sendErr error) (bool, channeldb.FailureReason) {
reportFail := func(srcIdx *int, msg lnwire.FailureMessage) (bool,
channeldb.FailureReason) {
// Report outcome to mission control.
final, reason, err := r.cfg.MissionControl.ReportPaymentFail(
paymentID, rt, srcIdx, msg,
)
if err != nil {
log.Errorf("Error reporting payment result to mc: %v",
err)
return true, channeldb.FailureReasonError
}
return final, reason
}
if sendErr == htlcswitch.ErrUnreadableFailureMessage {
log.Tracef("Unreadable failure when sending htlc")
return r.cfg.MissionControl.ReportPaymentFail(
paymentID, rt, nil, nil,
)
return reportFail(nil, nil)
}
// If an internal, non-forwarding error occurred, we can stop
// trying.
@ -1927,9 +1942,7 @@ func (r *ChannelRouter) processSendError(paymentID uint64, rt *route.Route,
log.Tracef("Node=%v reported failure when sending htlc",
failureSourceIdx)
return r.cfg.MissionControl.ReportPaymentFail(
paymentID, rt, &failureSourceIdx, failureMessage,
)
return reportFail(&failureSourceIdx, failureMessage)
}
// extractChannelUpdate examines the error and extracts the channel update.

View File

@ -90,7 +90,7 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr
return nil, nil, err
}
pathFindingConfig := &PathFindingConfig{
pathFindingConfig := PathFindingConfig{
MinProbability: 0.01,
PaymentAttemptPenalty: 100,
}
@ -100,9 +100,13 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr
AprioriHopProbability: 0.9,
}
mc := NewMissionControl(
mc, err := NewMissionControl(
graphInstance.graph.Database().DB,
mcConfig,
)
if err != nil {
return nil, nil, err
}
sessionSource := &SessionSource{
Graph: graphInstance.graph,
@ -110,11 +114,8 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr
QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {
return lnwire.NewMSatFromSatoshis(e.Capacity)
},
PathFindingConfig: PathFindingConfig{
MinProbability: 0.01,
PaymentAttemptPenalty: 100,
},
MissionControl: mc,
PathFindingConfig: pathFindingConfig,
MissionControl: mc,
}
router, err := New(Config{
@ -134,7 +135,7 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr
next := atomic.AddUint64(&uniquePaymentID, 1)
return next, nil
},
PathFindingConfig: *pathFindingConfig,
PathFindingConfig: pathFindingConfig,
})
if err != nil {
return nil, nil, fmt.Errorf("unable to create router %v", err)

View File

@ -653,12 +653,17 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
// servers, the mission control instance itself can be moved there too.
routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC)
s.missionControl = routing.NewMissionControl(
s.missionControl, err = routing.NewMissionControl(
chanDB.DB,
&routing.MissionControlConfig{
AprioriHopProbability: routingConfig.AprioriHopProbability,
PenaltyHalfLife: routingConfig.PenaltyHalfLife,
MaxMcHistory: routingConfig.MaxMcHistory,
},
)
if err != nil {
return nil, fmt.Errorf("can't create mission control: %v", err)
}
srvrLog.Debugf("Instantiating payment session source with config: "+
"PaymentAttemptPenalty=%v, MinRouteProbability=%v",