From 7e7b62035598bd66127add8cc0bff1b3611a66de Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 26 Jun 2019 13:00:35 +0200 Subject: [PATCH] routing: persist mission control data --- lnrpc/routerrpc/config.go | 4 + lnrpc/routerrpc/config_active.go | 2 + lnrpc/routerrpc/config_default.go | 1 + lnrpc/routerrpc/router_backend.go | 2 +- lnrpc/routerrpc/router_backend_test.go | 4 +- lnrpc/routerrpc/router_server.go | 5 +- routing/missioncontrol.go | 68 ++++++- routing/missioncontrol_store.go | 269 +++++++++++++++++++++++++ routing/missioncontrol_store_test.go | 140 +++++++++++++ routing/missioncontrol_test.go | 44 +++- routing/mock_test.go | 4 +- routing/router.go | 27 ++- routing/router_test.go | 17 +- server.go | 7 +- 14 files changed, 566 insertions(+), 28 deletions(-) create mode 100644 routing/missioncontrol_store.go create mode 100644 routing/missioncontrol_store_test.go diff --git a/lnrpc/routerrpc/config.go b/lnrpc/routerrpc/config.go index 2f82c5ae..ba094c8f 100644 --- a/lnrpc/routerrpc/config.go +++ b/lnrpc/routerrpc/config.go @@ -24,4 +24,8 @@ type RoutingConfig struct { // executing a payment attempt that fails. It is used to trade off // potentially better routes against their probability of succeeding. AttemptCost btcutil.Amount `long:"attemptcost" description:"The (virtual) cost in sats of a failed payment attempt"` + + // MaxMcHistory defines the maximum number of payment results that + // are held on disk by mission control. + MaxMcHistory int `long:"maxmchistory" description:"the maximum number of payment results that are held on disk by mission control"` } diff --git a/lnrpc/routerrpc/config_active.go b/lnrpc/routerrpc/config_active.go index 832c86d7..0479211c 100644 --- a/lnrpc/routerrpc/config_active.go +++ b/lnrpc/routerrpc/config_active.go @@ -49,6 +49,7 @@ func DefaultConfig() *Config { PenaltyHalfLife: routing.DefaultPenaltyHalfLife, AttemptCost: routing.DefaultPaymentAttemptPenalty. ToSatoshis(), + MaxMcHistory: routing.DefaultMaxMcHistory, } return &Config{ @@ -63,5 +64,6 @@ func GetRoutingConfig(cfg *Config) *RoutingConfig { MinRouteProbability: cfg.MinRouteProbability, AttemptCost: cfg.AttemptCost, PenaltyHalfLife: cfg.PenaltyHalfLife, + MaxMcHistory: cfg.MaxMcHistory, } } diff --git a/lnrpc/routerrpc/config_default.go b/lnrpc/routerrpc/config_default.go index 6308ca54..f8af98f3 100644 --- a/lnrpc/routerrpc/config_default.go +++ b/lnrpc/routerrpc/config_default.go @@ -22,5 +22,6 @@ func GetRoutingConfig(cfg *Config) *RoutingConfig { AttemptCost: routing.DefaultPaymentAttemptPenalty. ToSatoshis(), PenaltyHalfLife: routing.DefaultPenaltyHalfLife, + MaxMcHistory: routing.DefaultMaxMcHistory, } } diff --git a/lnrpc/routerrpc/router_backend.go b/lnrpc/routerrpc/router_backend.go index 139d4963..d4d1d8e2 100644 --- a/lnrpc/routerrpc/router_backend.go +++ b/lnrpc/routerrpc/router_backend.go @@ -64,7 +64,7 @@ type MissionControl interface { // ResetHistory resets the history of MissionControl returning it to a state as // if no payment attempts have been made. - ResetHistory() + ResetHistory() error // GetHistorySnapshot takes a snapshot from the current mission control state // and actual probability estimates. diff --git a/lnrpc/routerrpc/router_backend_test.go b/lnrpc/routerrpc/router_backend_test.go index e36baa7e..1a0ce3ee 100644 --- a/lnrpc/routerrpc/router_backend_test.go +++ b/lnrpc/routerrpc/router_backend_test.go @@ -147,7 +147,9 @@ func (m *mockMissionControl) GetEdgeProbability(fromNode route.Vertex, return testMissionControlProb } -func (m *mockMissionControl) ResetHistory() {} +func (m *mockMissionControl) ResetHistory() error { + return nil +} func (m *mockMissionControl) GetHistorySnapshot() *routing.MissionControlSnapshot { return nil diff --git a/lnrpc/routerrpc/router_server.go b/lnrpc/routerrpc/router_server.go index fc577969..64b88f62 100644 --- a/lnrpc/routerrpc/router_server.go +++ b/lnrpc/routerrpc/router_server.go @@ -436,7 +436,10 @@ func marshallChannelUpdate(update *lnwire.ChannelUpdate) *ChannelUpdate { func (s *Server) ResetMissionControl(ctx context.Context, req *ResetMissionControlRequest) (*ResetMissionControlResponse, error) { - s.cfg.RouterBackend.MissionControl.ResetHistory() + err := s.cfg.RouterBackend.MissionControl.ResetHistory() + if err != nil { + return nil, err + } return &ResetMissionControlResponse{}, nil } diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 350f35b6..e85fb19a 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" @@ -39,6 +40,9 @@ const ( // Nodes forward non-strict, so it isn't necessary to apply a less // restrictive channel level tracking scheme here. minSecondChanceInterval = time.Minute + + // DefaultMaxMcHistory is the default maximum history size. + DefaultMaxMcHistory = 1000 ) // MissionControl contains state which summarizes the past attempts of HTLC @@ -63,6 +67,8 @@ type MissionControl struct { cfg *MissionControlConfig + store *missionControlStore + sync.Mutex // TODO(roasbeef): further counters, if vertex continually unavailable, @@ -81,6 +87,10 @@ type MissionControlConfig struct { // AprioriHopProbability is the assumed success probability of a hop in // a route when no other information is available. AprioriHopProbability float64 + + // MaxMcHistory defines the maximum number of payment results that are + // held on disk. + MaxMcHistory int } // nodeHistory contains a summary of payment attempt outcomes involving a @@ -159,29 +169,70 @@ type paymentResult struct { } // NewMissionControl returns a new instance of missionControl. -func NewMissionControl(cfg *MissionControlConfig) *MissionControl { +func NewMissionControl(db *bbolt.DB, cfg *MissionControlConfig) ( + *MissionControl, error) { + log.Debugf("Instantiating mission control with config: "+ "PenaltyHalfLife=%v, AprioriHopProbability=%v", cfg.PenaltyHalfLife, cfg.AprioriHopProbability) - return &MissionControl{ + store, err := newMissionControlStore(db, cfg.MaxMcHistory) + if err != nil { + return nil, err + } + + mc := &MissionControl{ history: make(map[route.Vertex]*nodeHistory), lastSecondChance: make(map[DirectedNodePair]time.Time), now: time.Now, cfg: cfg, + store: store, } + + if err := mc.init(); err != nil { + return nil, err + } + + return mc, nil +} + +// init initializes mission control with historical data. +func (m *MissionControl) init() error { + log.Debugf("Mission control state reconstruction started") + + start := time.Now() + + results, err := m.store.fetchAll() + if err != nil { + return err + } + + for _, result := range results { + m.applyPaymentResult(result) + } + + log.Debugf("Mission control state reconstruction finished: "+ + "n=%v, time=%v", len(results), time.Now().Sub(start)) + + return nil } // ResetHistory resets the history of MissionControl returning it to a state as // if no payment attempts have been made. -func (m *MissionControl) ResetHistory() { +func (m *MissionControl) ResetHistory() error { m.Lock() defer m.Unlock() + if err := m.store.clear(); err != nil { + return err + } + m.history = make(map[route.Vertex]*nodeHistory) m.lastSecondChance = make(map[DirectedNodePair]time.Time) log.Debugf("Mission control history cleared") + + return nil } // GetEdgeProbability is expected to return the success probability of a payment @@ -406,7 +457,7 @@ func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot { // be made. func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, failureSourceIdx *int, failure lnwire.FailureMessage) (bool, - channeldb.FailureReason) { + channeldb.FailureReason, error) { timestamp := m.now() @@ -421,8 +472,15 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, route: rt, } + // Store complete result in database. + if err := m.store.AddResult(result); err != nil { + return false, 0, err + } + // Apply result to update mission control state. - return m.applyPaymentResult(result) + final, reason := m.applyPaymentResult(result) + + return final, reason, nil } // applyPaymentResult applies a payment result as input for future probability diff --git a/routing/missioncontrol_store.go b/routing/missioncontrol_store.go new file mode 100644 index 00000000..329d819f --- /dev/null +++ b/routing/missioncontrol_store.go @@ -0,0 +1,269 @@ +package routing + +import ( + "bytes" + "encoding/binary" + "fmt" + "time" + + "github.com/btcsuite/btcd/wire" + "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" +) + +var ( + // resultsKey is the fixed key under which the attempt results are + // stored. + resultsKey = []byte("missioncontrol-results") + + // Big endian is the preferred byte order, due to cursor scans over + // integer keys iterating in order. + byteOrder = binary.BigEndian +) + +const ( + // unknownFailureSourceIdx is the database encoding of an unknown error + // source. + unknownFailureSourceIdx = -1 +) + +// missionControlStore is a bolt db based implementation of a mission control +// store. It stores the raw payment attempt data from which the internal mission +// controls state can be rederived on startup. This allows the mission control +// internal data structure to be changed without requiring a database migration. +// Also changes to mission control parameters can be applied to historical data. +// Finally, it enables importing raw data from an external source. +type missionControlStore struct { + db *bbolt.DB + maxRecords int + numRecords int +} + +func newMissionControlStore(db *bbolt.DB, maxRecords int) (*missionControlStore, error) { + store := &missionControlStore{ + db: db, + maxRecords: maxRecords, + } + + // Create buckets if not yet existing. + err := db.Update(func(tx *bbolt.Tx) error { + resultsBucket, err := tx.CreateBucketIfNotExists(resultsKey) + if err != nil { + return fmt.Errorf("cannot create results bucket: %v", + err) + } + + // Count initial number of results and track this number in + // memory to avoid calling Stats().KeyN. The reliability of + // Stats() is doubtful and seemed to have caused crashes in the + // past (see #1874). + c := resultsBucket.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + store.numRecords++ + } + + return nil + }) + if err != nil { + return nil, err + } + + return store, nil +} + +// clear removes all results from the db. +func (b *missionControlStore) clear() error { + return b.db.Update(func(tx *bbolt.Tx) error { + if err := tx.DeleteBucket(resultsKey); err != nil { + return err + } + + _, err := tx.CreateBucket(resultsKey) + return err + }) +} + +// fetchAll returns all results currently stored in the database. +func (b *missionControlStore) fetchAll() ([]*paymentResult, error) { + var results []*paymentResult + + err := b.db.View(func(tx *bbolt.Tx) error { + resultBucket := tx.Bucket(resultsKey) + results = make([]*paymentResult, 0) + + return resultBucket.ForEach(func(k, v []byte) error { + result, err := deserializeResult(k, v) + if err != nil { + return err + } + + results = append(results, result) + + return nil + }) + + }) + if err != nil { + return nil, err + } + + return results, nil +} + +// serializeResult serializes a payment result and returns a key and value byte +// slice to insert into the bucket. +func serializeResult(rp *paymentResult) ([]byte, []byte, error) { + // Write timestamps, success status, failure source index and route. + var b bytes.Buffer + + var dbFailureSourceIdx int32 + if rp.failureSourceIdx == nil { + dbFailureSourceIdx = unknownFailureSourceIdx + } else { + dbFailureSourceIdx = int32(*rp.failureSourceIdx) + } + + err := channeldb.WriteElements( + &b, + uint64(rp.timeFwd.UnixNano()), + uint64(rp.timeReply.UnixNano()), + rp.success, dbFailureSourceIdx, + ) + if err != nil { + return nil, nil, err + } + + if err := channeldb.SerializeRoute(&b, *rp.route); err != nil { + return nil, nil, err + } + + // Write failure. If there is no failure message, write an empty + // byte slice. + var failureBytes bytes.Buffer + if rp.failure != nil { + err := lnwire.EncodeFailureMessage(&failureBytes, rp.failure, 0) + if err != nil { + return nil, nil, err + } + } + err = wire.WriteVarBytes(&b, 0, failureBytes.Bytes()) + if err != nil { + return nil, nil, err + } + + // Compose key that identifies this result. + key := getResultKey(rp) + + return key, b.Bytes(), nil +} + +// deserializeResult deserializes a payment result. +func deserializeResult(k, v []byte) (*paymentResult, error) { + // Parse payment id. + result := paymentResult{ + id: byteOrder.Uint64(k[8:]), + } + + r := bytes.NewReader(v) + + // Read timestamps, success status and failure source index. + var ( + timeFwd, timeReply uint64 + dbFailureSourceIdx int32 + ) + + err := channeldb.ReadElements( + r, &timeFwd, &timeReply, &result.success, &dbFailureSourceIdx, + ) + if err != nil { + return nil, err + } + + // Convert time stamps to local time zone for consistent logging. + result.timeFwd = time.Unix(0, int64(timeFwd)).Local() + result.timeReply = time.Unix(0, int64(timeReply)).Local() + + // Convert from unknown index magic number to nil value. + if dbFailureSourceIdx != unknownFailureSourceIdx { + failureSourceIdx := int(dbFailureSourceIdx) + result.failureSourceIdx = &failureSourceIdx + } + + // Read route. + route, err := channeldb.DeserializeRoute(r) + if err != nil { + return nil, err + } + result.route = &route + + // Read failure. + failureBytes, err := wire.ReadVarBytes( + r, 0, lnwire.FailureMessageLength, "failure", + ) + if err != nil { + return nil, err + } + if len(failureBytes) > 0 { + result.failure, err = lnwire.DecodeFailureMessage( + bytes.NewReader(failureBytes), 0, + ) + if err != nil { + return nil, err + } + } + + return &result, nil +} + +// AddResult adds a new result to the db. +func (b *missionControlStore) AddResult(rp *paymentResult) error { + return b.db.Update(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(resultsKey) + + // Prune oldest entries. + if b.maxRecords > 0 { + for b.numRecords >= b.maxRecords { + cursor := bucket.Cursor() + cursor.First() + if err := cursor.Delete(); err != nil { + return err + } + + b.numRecords-- + } + } + + // Serialize result into key and value byte slices. + k, v, err := serializeResult(rp) + if err != nil { + return err + } + + // The store is assumed to be idempotent. It could be that the + // same result is added twice and in that case the counter + // shouldn't be increased. + if bucket.Get(k) == nil { + b.numRecords++ + } + + // Put into results bucket. + return bucket.Put(k, v) + }) +} + +// getResultKey returns a byte slice representing a unique key for this payment +// result. +func getResultKey(rp *paymentResult) []byte { + var keyBytes [8 + 8 + 33]byte + + // Identify records by a combination of time, payment id and sender pub + // key. This allows importing mission control data from an external + // source without key collisions and keeps the records sorted + // chronologically. + byteOrder.PutUint64(keyBytes[:], uint64(rp.timeReply.UnixNano())) + byteOrder.PutUint64(keyBytes[8:], rp.id) + copy(keyBytes[16:], rp.route.SourcePubKey[:]) + + return keyBytes[:] +} diff --git a/routing/missioncontrol_store_test.go b/routing/missioncontrol_store_test.go new file mode 100644 index 00000000..43d150fa --- /dev/null +++ b/routing/missioncontrol_store_test.go @@ -0,0 +1,140 @@ +package routing + +import ( + "io/ioutil" + "os" + "reflect" + "testing" + "time" + + "github.com/lightningnetwork/lnd/lnwire" + + "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/routing/route" +) + +const testMaxRecords = 2 + +func TestMissionControlStore(t *testing.T) { + // Set time zone explictly to keep test deterministic. + time.Local = time.UTC + + file, err := ioutil.TempFile("", "*.db") + if err != nil { + t.Fatal(err) + } + + dbPath := file.Name() + + db, err := bbolt.Open(dbPath, 0600, nil) + if err != nil { + t.Fatal(err) + } + defer db.Close() + defer os.Remove(dbPath) + + store, err := newMissionControlStore(db, testMaxRecords) + if err != nil { + t.Fatal(err) + } + + results, err := store.fetchAll() + if err != nil { + t.Fatal(err) + } + if len(results) != 0 { + t.Fatal("expected no results") + } + + testRoute := route.Route{ + SourcePubKey: route.Vertex{1}, + Hops: []*route.Hop{ + { + PubKeyBytes: route.Vertex{2}, + }, + }, + } + + failureSourceIdx := 1 + + result1 := paymentResult{ + route: &testRoute, + failure: lnwire.NewFailUnknownPaymentHash(100), + failureSourceIdx: &failureSourceIdx, + id: 99, + timeReply: testTime, + timeFwd: testTime.Add(-time.Minute), + } + + result2 := result1 + result2.timeReply = result1.timeReply.Add(time.Hour) + result2.timeFwd = result1.timeReply.Add(time.Hour) + result2.id = 2 + + // Store result. + err = store.AddResult(&result2) + if err != nil { + t.Fatal(err) + } + + // Store again to test idempotency. + err = store.AddResult(&result2) + if err != nil { + t.Fatal(err) + } + + // Store second result which has an earlier timestamp. + err = store.AddResult(&result1) + if err != nil { + t.Fatal(err) + } + + results, err = store.fetchAll() + if err != nil { + t.Fatal(err) + } + if len(results) != 2 { + t.Fatal("expected two results") + } + + // Check that results are stored in chronological order. + if !reflect.DeepEqual(&result1, results[0]) { + t.Fatal() + } + if !reflect.DeepEqual(&result2, results[1]) { + t.Fatal() + } + + // Recreate store to test pruning. + store, err = newMissionControlStore(db, testMaxRecords) + if err != nil { + t.Fatal(err) + } + + // Add a newer result. + result3 := result1 + result3.timeReply = result1.timeReply.Add(2 * time.Hour) + result3.timeFwd = result1.timeReply.Add(2 * time.Hour) + result3.id = 3 + + err = store.AddResult(&result3) + if err != nil { + t.Fatal(err) + } + + // Check that results are pruned. + results, err = store.fetchAll() + if err != nil { + t.Fatal(err) + } + if len(results) != 2 { + t.Fatal("expected two results") + } + + if !reflect.DeepEqual(&result2, results[0]) { + t.Fatal() + } + if !reflect.DeepEqual(&result3, results[1]) { + t.Fatal() + } +} diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 1675b044..798161c9 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -1,9 +1,12 @@ package routing import ( + "io/ioutil" + "os" "testing" "time" + "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) @@ -37,6 +40,10 @@ type mcTestContext struct { t *testing.T mc *MissionControl now time.Time + + db *bbolt.DB + dbPath string + pid uint64 } @@ -46,17 +53,44 @@ func createMcTestContext(t *testing.T) *mcTestContext { now: mcTestTime, } - mc := NewMissionControl( + file, err := ioutil.TempFile("", "*.db") + if err != nil { + t.Fatal(err) + } + + ctx.dbPath = file.Name() + + ctx.db, err = bbolt.Open(ctx.dbPath, 0600, nil) + if err != nil { + t.Fatal(err) + } + + ctx.restartMc() + + return ctx +} + +// restartMc creates a new instances of mission control on the same database. +func (ctx *mcTestContext) restartMc() { + mc, err := NewMissionControl( + ctx.db, &MissionControlConfig{ PenaltyHalfLife: 30 * time.Minute, AprioriHopProbability: 0.8, }, ) + if err != nil { + ctx.t.Fatal(err) + } mc.now = func() time.Time { return ctx.now } ctx.mc = mc +} - return ctx +// cleanup closes the database and removes the temp file. +func (ctx *mcTestContext) cleanup() { + ctx.db.Close() + os.Remove(ctx.dbPath) } // Assert that mission control returns a probability for an edge. @@ -86,6 +120,7 @@ func (ctx *mcTestContext) reportFailure(t time.Time, // TestMissionControl tests mission control probability estimation. func TestMissionControl(t *testing.T) { ctx := createMcTestContext(t) + defer ctx.cleanup() ctx.now = testTime @@ -122,6 +157,10 @@ func TestMissionControl(t *testing.T) { ctx.now = testTime.Add(60 * time.Minute) ctx.expectP(1000, 0.4) + // Restart mission control to test persistence. + ctx.restartMc() + ctx.expectP(1000, 0.4) + // A node level failure should bring probability of every channel back // to zero. ctx.reportFailure( @@ -145,6 +184,7 @@ func TestMissionControl(t *testing.T) { // penalizing the channel yet. func TestMissionControlChannelUpdate(t *testing.T) { ctx := createMcTestContext(t) + defer ctx.cleanup() // Report a policy related failure. Because it is the first, we don't // expect a penalty. diff --git a/routing/mock_test.go b/routing/mock_test.go index b0444546..54a8011d 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -100,9 +100,9 @@ var _ MissionController = (*mockMissionControl)(nil) func (m *mockMissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, failureSourceIdx *int, failure lnwire.FailureMessage) ( - bool, channeldb.FailureReason) { + bool, channeldb.FailureReason, error) { - return false, 0 + return false, 0, nil } func (m *mockMissionControl) ReportEdgeFailure(failedEdge edge, diff --git a/routing/router.go b/routing/router.go index edc3aa3a..146e06ca 100644 --- a/routing/router.go +++ b/routing/router.go @@ -180,7 +180,7 @@ type MissionController interface { // need to be made. ReportPaymentFail(paymentID uint64, rt *route.Route, failureSourceIdx *int, failure lnwire.FailureMessage) (bool, - channeldb.FailureReason) + channeldb.FailureReason, error) // GetEdgeProbability is expected to return the success probability of a // payment from fromNode along edge. @@ -1896,12 +1896,27 @@ func (r *ChannelRouter) tryApplyChannelUpdate(rt *route.Route, func (r *ChannelRouter) processSendError(paymentID uint64, rt *route.Route, sendErr error) (bool, channeldb.FailureReason) { + reportFail := func(srcIdx *int, msg lnwire.FailureMessage) (bool, + channeldb.FailureReason) { + + // Report outcome to mission control. + final, reason, err := r.cfg.MissionControl.ReportPaymentFail( + paymentID, rt, srcIdx, msg, + ) + if err != nil { + log.Errorf("Error reporting payment result to mc: %v", + err) + + return true, channeldb.FailureReasonError + } + + return final, reason + } + if sendErr == htlcswitch.ErrUnreadableFailureMessage { log.Tracef("Unreadable failure when sending htlc") - return r.cfg.MissionControl.ReportPaymentFail( - paymentID, rt, nil, nil, - ) + return reportFail(nil, nil) } // If an internal, non-forwarding error occurred, we can stop // trying. @@ -1927,9 +1942,7 @@ func (r *ChannelRouter) processSendError(paymentID uint64, rt *route.Route, log.Tracef("Node=%v reported failure when sending htlc", failureSourceIdx) - return r.cfg.MissionControl.ReportPaymentFail( - paymentID, rt, &failureSourceIdx, failureMessage, - ) + return reportFail(&failureSourceIdx, failureMessage) } // extractChannelUpdate examines the error and extracts the channel update. diff --git a/routing/router_test.go b/routing/router_test.go index d0383c09..28ab1692 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -90,7 +90,7 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr return nil, nil, err } - pathFindingConfig := &PathFindingConfig{ + pathFindingConfig := PathFindingConfig{ MinProbability: 0.01, PaymentAttemptPenalty: 100, } @@ -100,9 +100,13 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr AprioriHopProbability: 0.9, } - mc := NewMissionControl( + mc, err := NewMissionControl( + graphInstance.graph.Database().DB, mcConfig, ) + if err != nil { + return nil, nil, err + } sessionSource := &SessionSource{ Graph: graphInstance.graph, @@ -110,11 +114,8 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { return lnwire.NewMSatFromSatoshis(e.Capacity) }, - PathFindingConfig: PathFindingConfig{ - MinProbability: 0.01, - PaymentAttemptPenalty: 100, - }, - MissionControl: mc, + PathFindingConfig: pathFindingConfig, + MissionControl: mc, } router, err := New(Config{ @@ -134,7 +135,7 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr next := atomic.AddUint64(&uniquePaymentID, 1) return next, nil }, - PathFindingConfig: *pathFindingConfig, + PathFindingConfig: pathFindingConfig, }) if err != nil { return nil, nil, fmt.Errorf("unable to create router %v", err) diff --git a/server.go b/server.go index 658b47f9..da2c51cb 100644 --- a/server.go +++ b/server.go @@ -653,12 +653,17 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, // servers, the mission control instance itself can be moved there too. routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC) - s.missionControl = routing.NewMissionControl( + s.missionControl, err = routing.NewMissionControl( + chanDB.DB, &routing.MissionControlConfig{ AprioriHopProbability: routingConfig.AprioriHopProbability, PenaltyHalfLife: routingConfig.PenaltyHalfLife, + MaxMcHistory: routingConfig.MaxMcHistory, }, ) + if err != nil { + return nil, fmt.Errorf("can't create mission control: %v", err) + } srvrLog.Debugf("Instantiating payment session source with config: "+ "PaymentAttemptPenalty=%v, MinRouteProbability=%v",