Merge pull request #3164 from joostjager/persistent-mc
routing: persistent mission control
This commit is contained in:
commit
7767eecbb8
@ -484,7 +484,7 @@ func serializePaymentAttemptInfo(w io.Writer, a *PaymentAttemptInfo) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := serializeRoute(w, a.Route); err != nil {
|
||||
if err := SerializeRoute(w, a.Route); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -497,7 +497,7 @@ func deserializePaymentAttemptInfo(r io.Reader) (*PaymentAttemptInfo, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
a.Route, err = deserializeRoute(r)
|
||||
a.Route, err = DeserializeRoute(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -533,7 +533,8 @@ func deserializeHop(r io.Reader) (*route.Hop, error) {
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func serializeRoute(w io.Writer, r route.Route) error {
|
||||
// SerializeRoute serializes a route.
|
||||
func SerializeRoute(w io.Writer, r route.Route) error {
|
||||
if err := WriteElements(w,
|
||||
r.TotalTimeLock, r.TotalAmount, r.SourcePubKey[:],
|
||||
); err != nil {
|
||||
@ -553,7 +554,8 @@ func serializeRoute(w io.Writer, r route.Route) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func deserializeRoute(r io.Reader) (route.Route, error) {
|
||||
// DeserializeRoute deserializes a route.
|
||||
func DeserializeRoute(r io.Reader) (route.Route, error) {
|
||||
rt := route.Route{}
|
||||
if err := ReadElements(r,
|
||||
&rt.TotalTimeLock, &rt.TotalAmount,
|
||||
|
@ -203,12 +203,12 @@ func TestRouteSerialization(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var b bytes.Buffer
|
||||
if err := serializeRoute(&b, testRoute); err != nil {
|
||||
if err := SerializeRoute(&b, testRoute); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r := bytes.NewReader(b.Bytes())
|
||||
route2, err := deserializeRoute(r)
|
||||
route2, err := DeserializeRoute(r)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -3,26 +3,29 @@ package routerrpc
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/btcsuite/btcutil"
|
||||
)
|
||||
|
||||
// RoutingConfig contains the configurable parameters that control routing.
|
||||
type RoutingConfig struct {
|
||||
// PenaltyHalfLife defines after how much time a penalized node or
|
||||
// channel is back at 50% probability.
|
||||
PenaltyHalfLife time.Duration
|
||||
|
||||
// PaymentAttemptPenalty is the virtual cost in path finding weight
|
||||
// units of executing a payment attempt that fails. It is used to trade
|
||||
// off potentially better routes against their probability of
|
||||
// succeeding.
|
||||
PaymentAttemptPenalty lnwire.MilliSatoshi
|
||||
|
||||
// MinProbability defines the minimum success probability of the
|
||||
// returned route.
|
||||
MinRouteProbability float64
|
||||
// MinRouteProbability is the minimum required route success probability
|
||||
// to attempt the payment.
|
||||
MinRouteProbability float64 `long:"minrtprob" description:"Minimum required route success probability to attempt the payment"`
|
||||
|
||||
// AprioriHopProbability is the assumed success probability of a hop in
|
||||
// a route when no other information is available.
|
||||
AprioriHopProbability float64
|
||||
AprioriHopProbability float64 `long:"apriorihopprob" description:"Assumed success probability of a hop in a route when no other information is available."`
|
||||
|
||||
// PenaltyHalfLife defines after how much time a penalized node or
|
||||
// channel is back at 50% probability.
|
||||
PenaltyHalfLife time.Duration `long:"penaltyhalflife" description:"Defines the duration after which a penalized node or channel is back at 50% probability"`
|
||||
|
||||
// AttemptCost is the virtual cost in path finding weight units of
|
||||
// executing a payment attempt that fails. It is used to trade off
|
||||
// potentially better routes against their probability of succeeding.
|
||||
AttemptCost btcutil.Amount `long:"attemptcost" description:"The (virtual) cost in sats of a failed payment attempt"`
|
||||
|
||||
// MaxMcHistory defines the maximum number of payment results that
|
||||
// are held on disk by mission control.
|
||||
MaxMcHistory int `long:"maxmchistory" description:"the maximum number of payment results that are held on disk by mission control"`
|
||||
}
|
||||
|
@ -3,10 +3,6 @@
|
||||
package routerrpc
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcutil"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/macaroons"
|
||||
"github.com/lightningnetwork/lnd/routing"
|
||||
)
|
||||
@ -17,28 +13,13 @@ import (
|
||||
// options, while if able to be populated, the latter fields MUST also be
|
||||
// specified.
|
||||
type Config struct {
|
||||
RoutingConfig
|
||||
|
||||
// RouterMacPath is the path for the router macaroon. If unspecified
|
||||
// then we assume that the macaroon will be found under the network
|
||||
// directory, named DefaultRouterMacFilename.
|
||||
RouterMacPath string `long:"routermacaroonpath" description:"Path to the router macaroon"`
|
||||
|
||||
// MinProbability is the minimum required route success probability to
|
||||
// attempt the payment.
|
||||
MinRouteProbability float64 `long:"minrtprob" description:"Minimum required route success probability to attempt the payment"`
|
||||
|
||||
// AprioriHopProbability is the assumed success probability of a hop in
|
||||
// a route when no other information is available.
|
||||
AprioriHopProbability float64 `long:"apriorihopprob" description:"Assumed success probability of a hop in a route when no other information is available."`
|
||||
|
||||
// PenaltyHalfLife defines after how much time a penalized node or
|
||||
// channel is back at 50% probability.
|
||||
PenaltyHalfLife time.Duration `long:"penaltyhalflife" description:"Defines the duration after which a penalized node or channel is back at 50% probability"`
|
||||
|
||||
// AttemptCost is the virtual cost in path finding weight units of
|
||||
// executing a payment attempt that fails. It is used to trade off
|
||||
// potentially better routes against their probability of succeeding.
|
||||
AttemptCost int64 `long:"attemptcost" description:"The (virtual) cost in sats of a failed payment attempt"`
|
||||
|
||||
// NetworkDir is the main network directory wherein the router rpc
|
||||
// server will find the macaroon named DefaultRouterMacFilename.
|
||||
NetworkDir string
|
||||
@ -62,13 +43,17 @@ type Config struct {
|
||||
|
||||
// DefaultConfig defines the config defaults.
|
||||
func DefaultConfig() *Config {
|
||||
return &Config{
|
||||
defaultRoutingConfig := RoutingConfig{
|
||||
AprioriHopProbability: routing.DefaultAprioriHopProbability,
|
||||
MinRouteProbability: routing.DefaultMinRouteProbability,
|
||||
PenaltyHalfLife: routing.DefaultPenaltyHalfLife,
|
||||
AttemptCost: int64(
|
||||
routing.DefaultPaymentAttemptPenalty.ToSatoshis(),
|
||||
),
|
||||
AttemptCost: routing.DefaultPaymentAttemptPenalty.
|
||||
ToSatoshis(),
|
||||
MaxMcHistory: routing.DefaultMaxMcHistory,
|
||||
}
|
||||
|
||||
return &Config{
|
||||
RoutingConfig: defaultRoutingConfig,
|
||||
}
|
||||
}
|
||||
|
||||
@ -77,9 +62,8 @@ func GetRoutingConfig(cfg *Config) *RoutingConfig {
|
||||
return &RoutingConfig{
|
||||
AprioriHopProbability: cfg.AprioriHopProbability,
|
||||
MinRouteProbability: cfg.MinRouteProbability,
|
||||
PaymentAttemptPenalty: lnwire.NewMSatFromSatoshis(
|
||||
btcutil.Amount(cfg.AttemptCost),
|
||||
),
|
||||
PenaltyHalfLife: cfg.PenaltyHalfLife,
|
||||
AttemptCost: cfg.AttemptCost,
|
||||
PenaltyHalfLife: cfg.PenaltyHalfLife,
|
||||
MaxMcHistory: cfg.MaxMcHistory,
|
||||
}
|
||||
}
|
||||
|
@ -19,7 +19,9 @@ func GetRoutingConfig(cfg *Config) *RoutingConfig {
|
||||
return &RoutingConfig{
|
||||
AprioriHopProbability: routing.DefaultAprioriHopProbability,
|
||||
MinRouteProbability: routing.DefaultMinRouteProbability,
|
||||
PaymentAttemptPenalty: routing.DefaultPaymentAttemptPenalty,
|
||||
PenaltyHalfLife: routing.DefaultPenaltyHalfLife,
|
||||
AttemptCost: routing.DefaultPaymentAttemptPenalty.
|
||||
ToSatoshis(),
|
||||
PenaltyHalfLife: routing.DefaultPenaltyHalfLife,
|
||||
MaxMcHistory: routing.DefaultMaxMcHistory,
|
||||
}
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ type MissionControl interface {
|
||||
|
||||
// ResetHistory resets the history of MissionControl returning it to a state as
|
||||
// if no payment attempts have been made.
|
||||
ResetHistory()
|
||||
ResetHistory() error
|
||||
|
||||
// GetHistorySnapshot takes a snapshot from the current mission control state
|
||||
// and actual probability estimates.
|
||||
|
@ -147,7 +147,9 @@ func (m *mockMissionControl) GetEdgeProbability(fromNode route.Vertex,
|
||||
return testMissionControlProb
|
||||
}
|
||||
|
||||
func (m *mockMissionControl) ResetHistory() {}
|
||||
func (m *mockMissionControl) ResetHistory() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockMissionControl) GetHistorySnapshot() *routing.MissionControlSnapshot {
|
||||
return nil
|
||||
|
@ -436,7 +436,10 @@ func marshallChannelUpdate(update *lnwire.ChannelUpdate) *ChannelUpdate {
|
||||
func (s *Server) ResetMissionControl(ctx context.Context,
|
||||
req *ResetMissionControlRequest) (*ResetMissionControlResponse, error) {
|
||||
|
||||
s.cfg.RouterBackend.MissionControl.ResetHistory()
|
||||
err := s.cfg.RouterBackend.MissionControl.ResetHistory()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ResetMissionControlResponse{}, nil
|
||||
}
|
||||
|
@ -1113,10 +1113,16 @@ func DecodeFailure(r io.Reader, pver uint32) (FailureMessage, error) {
|
||||
|
||||
dataReader := bytes.NewReader(failureData)
|
||||
|
||||
return DecodeFailureMessage(dataReader, pver)
|
||||
}
|
||||
|
||||
// DecodeFailureMessage decodes just the failure message, ignoring any padding
|
||||
// that may be present at the end.
|
||||
func DecodeFailureMessage(r io.Reader, pver uint32) (FailureMessage, error) {
|
||||
// Once we have the failure data, we can obtain the failure code from
|
||||
// the first two bytes of the buffer.
|
||||
var codeBytes [2]byte
|
||||
if _, err := io.ReadFull(dataReader, codeBytes[:]); err != nil {
|
||||
if _, err := io.ReadFull(r, codeBytes[:]); err != nil {
|
||||
return nil, fmt.Errorf("unable to read failure code: %v", err)
|
||||
}
|
||||
failCode := FailCode(binary.BigEndian.Uint16(codeBytes[:]))
|
||||
@ -1132,10 +1138,9 @@ func DecodeFailure(r io.Reader, pver uint32) (FailureMessage, error) {
|
||||
// well.
|
||||
switch f := failure.(type) {
|
||||
case Serializable:
|
||||
if err := f.Decode(dataReader, pver); err != nil {
|
||||
if err := f.Decode(r, pver); err != nil {
|
||||
return nil, fmt.Errorf("unable to decode error "+
|
||||
"update (type=%T, len_bytes=%v, bytes=%x): %v",
|
||||
failure, failureLength, failureData[:], err)
|
||||
"update (type=%T): %v", failure, err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1147,26 +1152,11 @@ func DecodeFailure(r io.Reader, pver uint32) (FailureMessage, error) {
|
||||
func EncodeFailure(w io.Writer, failure FailureMessage, pver uint32) error {
|
||||
var failureMessageBuffer bytes.Buffer
|
||||
|
||||
// First, we'll write out the error code itself into the failure
|
||||
// buffer.
|
||||
var codeBytes [2]byte
|
||||
code := uint16(failure.Code())
|
||||
binary.BigEndian.PutUint16(codeBytes[:], code)
|
||||
_, err := failureMessageBuffer.Write(codeBytes[:])
|
||||
err := EncodeFailureMessage(&failureMessageBuffer, failure, pver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Next, some message have an additional message payload, if this is
|
||||
// one of those types, then we'll also encode the error payload as
|
||||
// well.
|
||||
switch failure := failure.(type) {
|
||||
case Serializable:
|
||||
if err := failure.Encode(&failureMessageBuffer, pver); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// The combined size of this message must be below the max allowed
|
||||
// failure message length.
|
||||
failureMessage := failureMessageBuffer.Bytes()
|
||||
@ -1187,6 +1177,32 @@ func EncodeFailure(w io.Writer, failure FailureMessage, pver uint32) error {
|
||||
)
|
||||
}
|
||||
|
||||
// EncodeFailureMessage encodes just the failure message without adding a length
|
||||
// and padding the message for the onion protocol.
|
||||
func EncodeFailureMessage(w io.Writer, failure FailureMessage, pver uint32) error {
|
||||
// First, we'll write out the error code itself into the failure
|
||||
// buffer.
|
||||
var codeBytes [2]byte
|
||||
code := uint16(failure.Code())
|
||||
binary.BigEndian.PutUint16(codeBytes[:], code)
|
||||
_, err := w.Write(codeBytes[:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Next, some message have an additional message payload, if this is
|
||||
// one of those types, then we'll also encode the error payload as
|
||||
// well.
|
||||
switch failure := failure.(type) {
|
||||
case Serializable:
|
||||
if err := failure.Encode(w, pver); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// makeEmptyOnionError creates a new empty onion error of the proper concrete
|
||||
// type based on the passed failure code.
|
||||
func makeEmptyOnionError(code FailCode) (FailureMessage, error) {
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/bbolt"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/routing/route"
|
||||
)
|
||||
@ -38,6 +40,9 @@ const (
|
||||
// Nodes forward non-strict, so it isn't necessary to apply a less
|
||||
// restrictive channel level tracking scheme here.
|
||||
minSecondChanceInterval = time.Minute
|
||||
|
||||
// DefaultMaxMcHistory is the default maximum history size.
|
||||
DefaultMaxMcHistory = 1000
|
||||
)
|
||||
|
||||
// MissionControl contains state which summarizes the past attempts of HTLC
|
||||
@ -62,6 +67,8 @@ type MissionControl struct {
|
||||
|
||||
cfg *MissionControlConfig
|
||||
|
||||
store *missionControlStore
|
||||
|
||||
sync.Mutex
|
||||
|
||||
// TODO(roasbeef): further counters, if vertex continually unavailable,
|
||||
@ -80,6 +87,10 @@ type MissionControlConfig struct {
|
||||
// AprioriHopProbability is the assumed success probability of a hop in
|
||||
// a route when no other information is available.
|
||||
AprioriHopProbability float64
|
||||
|
||||
// MaxMcHistory defines the maximum number of payment results that are
|
||||
// held on disk.
|
||||
MaxMcHistory int
|
||||
}
|
||||
|
||||
// nodeHistory contains a summary of payment attempt outcomes involving a
|
||||
@ -146,30 +157,82 @@ type MissionControlChannelSnapshot struct {
|
||||
SuccessProb float64
|
||||
}
|
||||
|
||||
// paymentResult is the information that becomes available when a payment
|
||||
// attempt completes.
|
||||
type paymentResult struct {
|
||||
id uint64
|
||||
timeFwd, timeReply time.Time
|
||||
route *route.Route
|
||||
success bool
|
||||
failureSourceIdx *int
|
||||
failure lnwire.FailureMessage
|
||||
}
|
||||
|
||||
// NewMissionControl returns a new instance of missionControl.
|
||||
func NewMissionControl(cfg *MissionControlConfig) *MissionControl {
|
||||
func NewMissionControl(db *bbolt.DB, cfg *MissionControlConfig) (
|
||||
*MissionControl, error) {
|
||||
|
||||
log.Debugf("Instantiating mission control with config: "+
|
||||
"PenaltyHalfLife=%v, AprioriHopProbability=%v",
|
||||
cfg.PenaltyHalfLife, cfg.AprioriHopProbability)
|
||||
|
||||
return &MissionControl{
|
||||
store, err := newMissionControlStore(db, cfg.MaxMcHistory)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mc := &MissionControl{
|
||||
history: make(map[route.Vertex]*nodeHistory),
|
||||
lastSecondChance: make(map[DirectedNodePair]time.Time),
|
||||
now: time.Now,
|
||||
cfg: cfg,
|
||||
store: store,
|
||||
}
|
||||
|
||||
if err := mc.init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return mc, nil
|
||||
}
|
||||
|
||||
// init initializes mission control with historical data.
|
||||
func (m *MissionControl) init() error {
|
||||
log.Debugf("Mission control state reconstruction started")
|
||||
|
||||
start := time.Now()
|
||||
|
||||
results, err := m.store.fetchAll()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, result := range results {
|
||||
m.applyPaymentResult(result)
|
||||
}
|
||||
|
||||
log.Debugf("Mission control state reconstruction finished: "+
|
||||
"n=%v, time=%v", len(results), time.Now().Sub(start))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResetHistory resets the history of MissionControl returning it to a state as
|
||||
// if no payment attempts have been made.
|
||||
func (m *MissionControl) ResetHistory() {
|
||||
func (m *MissionControl) ResetHistory() error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
if err := m.store.clear(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.history = make(map[route.Vertex]*nodeHistory)
|
||||
m.lastSecondChance = make(map[DirectedNodePair]time.Time)
|
||||
|
||||
log.Debugf("Mission control history cleared")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetEdgeProbability is expected to return the success probability of a payment
|
||||
@ -282,22 +345,22 @@ func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHist
|
||||
return node
|
||||
}
|
||||
|
||||
// ReportVertexFailure reports a node level failure.
|
||||
func (m *MissionControl) ReportVertexFailure(v route.Vertex) {
|
||||
log.Debugf("Reporting vertex %v failure to Mission Control", v)
|
||||
// reportVertexFailure reports a node level failure.
|
||||
func (m *MissionControl) reportVertexFailure(timestamp time.Time,
|
||||
v route.Vertex) {
|
||||
|
||||
now := m.now()
|
||||
log.Debugf("Reporting vertex %v failure to Mission Control", v)
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
history := m.createHistoryIfNotExists(v)
|
||||
history.lastFail = &now
|
||||
history.lastFail = ×tamp
|
||||
}
|
||||
|
||||
// ReportEdgePolicyFailure reports a policy related failure.
|
||||
func (m *MissionControl) ReportEdgePolicyFailure(failedEdge edge) {
|
||||
now := m.now()
|
||||
// reportEdgePolicyFailure reports a policy related failure.
|
||||
func (m *MissionControl) reportEdgePolicyFailure(timestamp time.Time,
|
||||
failedEdge edge) {
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
@ -306,32 +369,30 @@ func (m *MissionControl) ReportEdgePolicyFailure(failedEdge edge) {
|
||||
// immediately. If some time has passed since the last policy failure,
|
||||
// we grant the node a second chance at forwarding the payment.
|
||||
if m.requestSecondChance(
|
||||
now, failedEdge.from, failedEdge.to,
|
||||
timestamp, failedEdge.from, failedEdge.to,
|
||||
) {
|
||||
return
|
||||
}
|
||||
|
||||
history := m.createHistoryIfNotExists(failedEdge.from)
|
||||
history.lastFail = &now
|
||||
history.lastFail = ×tamp
|
||||
}
|
||||
|
||||
// ReportEdgeFailure reports a channel level failure.
|
||||
// reportEdgeFailure reports a channel level failure.
|
||||
//
|
||||
// TODO(roasbeef): also add value attempted to send and capacity of channel
|
||||
func (m *MissionControl) ReportEdgeFailure(failedEdge edge,
|
||||
func (m *MissionControl) reportEdgeFailure(timestamp time.Time, failedEdge edge,
|
||||
minPenalizeAmt lnwire.MilliSatoshi) {
|
||||
|
||||
log.Debugf("Reporting channel %v failure to Mission Control",
|
||||
failedEdge.channel)
|
||||
|
||||
now := m.now()
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
history := m.createHistoryIfNotExists(failedEdge.from)
|
||||
history.channelLastFail[failedEdge.channel] = &channelHistory{
|
||||
lastFail: now,
|
||||
lastFail: timestamp,
|
||||
minPenalizeAmt: minPenalizeAmt,
|
||||
}
|
||||
}
|
||||
@ -387,3 +448,286 @@ func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
|
||||
|
||||
return &snapshot
|
||||
}
|
||||
|
||||
// ReportPaymentFail reports a failed payment to mission control as input for
|
||||
// future probability estimates. The failureSourceIdx argument indicates the
|
||||
// failure source. If it is nil, the failure source is unknown. This function
|
||||
// returns a bool indicating whether this error is a final error. If it is
|
||||
// final, a failure reason is returned and no further payment attempts need to
|
||||
// be made.
|
||||
func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
|
||||
failureSourceIdx *int, failure lnwire.FailureMessage) (bool,
|
||||
channeldb.FailureReason, error) {
|
||||
|
||||
timestamp := m.now()
|
||||
|
||||
// TODO(joostjager): Use actual payment initiation time for timeFwd.
|
||||
result := &paymentResult{
|
||||
success: false,
|
||||
timeFwd: timestamp,
|
||||
timeReply: timestamp,
|
||||
id: paymentID,
|
||||
failureSourceIdx: failureSourceIdx,
|
||||
failure: failure,
|
||||
route: rt,
|
||||
}
|
||||
|
||||
// Store complete result in database.
|
||||
if err := m.store.AddResult(result); err != nil {
|
||||
return false, 0, err
|
||||
}
|
||||
|
||||
// Apply result to update mission control state.
|
||||
final, reason := m.applyPaymentResult(result)
|
||||
|
||||
return final, reason, nil
|
||||
}
|
||||
|
||||
// applyPaymentResult applies a payment result as input for future probability
|
||||
// estimates. It returns a bool indicating whether this error is a final error
|
||||
// and no further payment attempts need to be made.
|
||||
func (m *MissionControl) applyPaymentResult(result *paymentResult) (
|
||||
bool, channeldb.FailureReason) {
|
||||
|
||||
var (
|
||||
failureSourceIdxInt int
|
||||
failure lnwire.FailureMessage
|
||||
)
|
||||
|
||||
if result.failureSourceIdx == nil {
|
||||
// If the failure message could not be decrypted, attribute the
|
||||
// failure to our own outgoing channel.
|
||||
//
|
||||
// TODO(joostager): Penalize all channels in the route.
|
||||
failureSourceIdxInt = 0
|
||||
failure = lnwire.NewTemporaryChannelFailure(nil)
|
||||
} else {
|
||||
failureSourceIdxInt = *result.failureSourceIdx
|
||||
failure = result.failure
|
||||
}
|
||||
|
||||
var failureVertex route.Vertex
|
||||
|
||||
if failureSourceIdxInt > 0 {
|
||||
failureVertex = result.route.Hops[failureSourceIdxInt-1].PubKeyBytes
|
||||
} else {
|
||||
failureVertex = result.route.SourcePubKey
|
||||
}
|
||||
log.Tracef("Node %x (index %v) reported failure when sending htlc",
|
||||
failureVertex, result.failureSourceIdx)
|
||||
|
||||
// Always determine chan id ourselves, because a channel update with id
|
||||
// may not be available.
|
||||
failedEdge, failedAmt := getFailedEdge(
|
||||
result.route, failureSourceIdxInt,
|
||||
)
|
||||
|
||||
switch failure.(type) {
|
||||
|
||||
// If the end destination didn't know the payment
|
||||
// hash or we sent the wrong payment amount to the
|
||||
// destination, then we'll terminate immediately.
|
||||
case *lnwire.FailUnknownPaymentHash:
|
||||
// TODO(joostjager): Check onionErr.Amount() whether it matches
|
||||
// what we expect. (Will it ever not match, because if not
|
||||
// final_incorrect_htlc_amount would be returned?)
|
||||
|
||||
return true, channeldb.FailureReasonIncorrectPaymentDetails
|
||||
|
||||
// If we sent the wrong amount to the destination, then
|
||||
// we'll exit early.
|
||||
case *lnwire.FailIncorrectPaymentAmount:
|
||||
return true, channeldb.FailureReasonIncorrectPaymentDetails
|
||||
|
||||
// If the time-lock that was extended to the final node
|
||||
// was incorrect, then we can't proceed.
|
||||
case *lnwire.FailFinalIncorrectCltvExpiry:
|
||||
// TODO(joostjager): Take into account that second last hop may
|
||||
// have deliberately handed out an htlc that expires too soon.
|
||||
// In that case we should continue routing.
|
||||
return true, channeldb.FailureReasonError
|
||||
|
||||
// If we crafted an invalid onion payload for the final
|
||||
// node, then we'll exit early.
|
||||
case *lnwire.FailFinalIncorrectHtlcAmount:
|
||||
// TODO(joostjager): Take into account that second last hop may
|
||||
// have deliberately handed out an htlc with a too low value. In
|
||||
// that case we should continue routing.
|
||||
|
||||
return true, channeldb.FailureReasonError
|
||||
|
||||
// Similarly, if the HTLC expiry that we extended to
|
||||
// the final hop expires too soon, then will fail the
|
||||
// payment.
|
||||
//
|
||||
// TODO(roasbeef): can happen to to race condition, try
|
||||
// again with recent block height
|
||||
case *lnwire.FailFinalExpiryTooSoon:
|
||||
// TODO(joostjager): Take into account that any hop may have
|
||||
// delayed. Ideally we should continue routing. Knowing the
|
||||
// delaying node at this point would help.
|
||||
return true, channeldb.FailureReasonIncorrectPaymentDetails
|
||||
|
||||
// If we erroneously attempted to cross a chain border,
|
||||
// then we'll cancel the payment.
|
||||
case *lnwire.FailInvalidRealm:
|
||||
return true, channeldb.FailureReasonError
|
||||
|
||||
// If we get a notice that the expiry was too soon for
|
||||
// an intermediate node, then we'll prune out the node
|
||||
// that sent us this error, as it doesn't now what the
|
||||
// correct block height is.
|
||||
case *lnwire.FailExpiryTooSoon:
|
||||
m.reportVertexFailure(result.timeReply, failureVertex)
|
||||
return false, 0
|
||||
|
||||
// If we hit an instance of onion payload corruption or an invalid
|
||||
// version, then we'll exit early as this shouldn't happen in the
|
||||
// typical case.
|
||||
//
|
||||
// TODO(joostjager): Take into account that the previous hop may have
|
||||
// tampered with the onion. Routing should continue using other paths.
|
||||
case *lnwire.FailInvalidOnionVersion:
|
||||
return true, channeldb.FailureReasonError
|
||||
case *lnwire.FailInvalidOnionHmac:
|
||||
return true, channeldb.FailureReasonError
|
||||
case *lnwire.FailInvalidOnionKey:
|
||||
return true, channeldb.FailureReasonError
|
||||
|
||||
// If we get a failure due to violating the minimum
|
||||
// amount, we'll apply the new minimum amount and retry
|
||||
// routing.
|
||||
case *lnwire.FailAmountBelowMinimum:
|
||||
m.reportEdgePolicyFailure(result.timeReply, failedEdge)
|
||||
return false, 0
|
||||
|
||||
// If we get a failure due to a fee, we'll apply the
|
||||
// new fee update, and retry our attempt using the
|
||||
// newly updated fees.
|
||||
case *lnwire.FailFeeInsufficient:
|
||||
m.reportEdgePolicyFailure(result.timeReply, failedEdge)
|
||||
return false, 0
|
||||
|
||||
// If we get the failure for an intermediate node that
|
||||
// disagrees with our time lock values, then we'll
|
||||
// apply the new delta value and try it once more.
|
||||
case *lnwire.FailIncorrectCltvExpiry:
|
||||
m.reportEdgePolicyFailure(result.timeReply, failedEdge)
|
||||
return false, 0
|
||||
|
||||
// The outgoing channel that this node was meant to
|
||||
// forward one is currently disabled, so we'll apply
|
||||
// the update and continue.
|
||||
case *lnwire.FailChannelDisabled:
|
||||
m.reportEdgeFailure(result.timeReply, failedEdge, 0)
|
||||
return false, 0
|
||||
|
||||
// It's likely that the outgoing channel didn't have
|
||||
// sufficient capacity, so we'll prune this edge for
|
||||
// now, and continue onwards with our path finding.
|
||||
case *lnwire.FailTemporaryChannelFailure:
|
||||
m.reportEdgeFailure(result.timeReply, failedEdge, failedAmt)
|
||||
return false, 0
|
||||
|
||||
// If the send fail due to a node not having the
|
||||
// required features, then we'll note this error and
|
||||
// continue.
|
||||
case *lnwire.FailRequiredNodeFeatureMissing:
|
||||
m.reportVertexFailure(result.timeReply, failureVertex)
|
||||
return false, 0
|
||||
|
||||
// If the send fail due to a node not having the
|
||||
// required features, then we'll note this error and
|
||||
// continue.
|
||||
case *lnwire.FailRequiredChannelFeatureMissing:
|
||||
m.reportVertexFailure(result.timeReply, failureVertex)
|
||||
return false, 0
|
||||
|
||||
// If the next hop in the route wasn't known or
|
||||
// offline, we'll only the channel which we attempted
|
||||
// to route over. This is conservative, and it can
|
||||
// handle faulty channels between nodes properly.
|
||||
// Additionally, this guards against routing nodes
|
||||
// returning errors in order to attempt to black list
|
||||
// another node.
|
||||
case *lnwire.FailUnknownNextPeer:
|
||||
m.reportEdgeFailure(result.timeReply, failedEdge, 0)
|
||||
return false, 0
|
||||
|
||||
// If the node wasn't able to forward for which ever
|
||||
// reason, then we'll note this and continue with the
|
||||
// routes.
|
||||
case *lnwire.FailTemporaryNodeFailure:
|
||||
m.reportVertexFailure(result.timeReply, failureVertex)
|
||||
return false, 0
|
||||
|
||||
case *lnwire.FailPermanentNodeFailure:
|
||||
m.reportVertexFailure(result.timeReply, failureVertex)
|
||||
return false, 0
|
||||
|
||||
// If we crafted a route that contains a too long time
|
||||
// lock for an intermediate node, we'll prune the node.
|
||||
// As there currently is no way of knowing that node's
|
||||
// maximum acceptable cltv, we cannot take this
|
||||
// constraint into account during routing.
|
||||
//
|
||||
// TODO(joostjager): Record the rejected cltv and use
|
||||
// that as a hint during future path finding through
|
||||
// that node.
|
||||
case *lnwire.FailExpiryTooFar:
|
||||
m.reportVertexFailure(result.timeReply, failureVertex)
|
||||
return false, 0
|
||||
|
||||
// If we get a permanent channel or node failure, then
|
||||
// we'll prune the channel in both directions and
|
||||
// continue with the rest of the routes.
|
||||
case *lnwire.FailPermanentChannelFailure:
|
||||
m.reportEdgeFailure(result.timeReply, failedEdge, 0)
|
||||
m.reportEdgeFailure(result.timeReply, edge{
|
||||
from: failedEdge.to,
|
||||
to: failedEdge.from,
|
||||
channel: failedEdge.channel,
|
||||
}, 0)
|
||||
return false, 0
|
||||
|
||||
// Any other failure or an empty failure will get the node pruned.
|
||||
default:
|
||||
m.reportVertexFailure(result.timeReply, failureVertex)
|
||||
return false, 0
|
||||
}
|
||||
}
|
||||
|
||||
// getFailedEdge tries to locate the failing channel given a route and the
|
||||
// pubkey of the node that sent the failure. It will assume that the failure is
|
||||
// associated with the outgoing channel of the failing node. As a second result,
|
||||
// it returns the amount sent over the edge.
|
||||
func getFailedEdge(route *route.Route, failureSource int) (edge,
|
||||
lnwire.MilliSatoshi) {
|
||||
|
||||
// Determine if we have a failure from the final hop. If it is, we
|
||||
// assume that the failing channel is the incoming channel.
|
||||
//
|
||||
// TODO(joostjager): In this case, certain types of failures are not
|
||||
// expected. For example FailUnknownNextPeer. This could be a reason to
|
||||
// prune the node?
|
||||
if failureSource == len(route.Hops) {
|
||||
failureSource--
|
||||
}
|
||||
|
||||
// As this failure indicates that the target channel was unable to carry
|
||||
// this HTLC (for w/e reason), we'll return the _outgoing_ channel that
|
||||
// the source of the failure was meant to pass the HTLC along to.
|
||||
if failureSource == 0 {
|
||||
return edge{
|
||||
from: route.SourcePubKey,
|
||||
to: route.Hops[0].PubKeyBytes,
|
||||
channel: route.Hops[0].ChannelID,
|
||||
}, route.TotalAmount
|
||||
}
|
||||
|
||||
return edge{
|
||||
from: route.Hops[failureSource-1].PubKeyBytes,
|
||||
to: route.Hops[failureSource].PubKeyBytes,
|
||||
channel: route.Hops[failureSource].ChannelID,
|
||||
}, route.Hops[failureSource-1].AmtToForward
|
||||
}
|
||||
|
269
routing/missioncontrol_store.go
Normal file
269
routing/missioncontrol_store.go
Normal file
@ -0,0 +1,269 @@
|
||||
package routing
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/coreos/bbolt"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
)
|
||||
|
||||
var (
|
||||
// resultsKey is the fixed key under which the attempt results are
|
||||
// stored.
|
||||
resultsKey = []byte("missioncontrol-results")
|
||||
|
||||
// Big endian is the preferred byte order, due to cursor scans over
|
||||
// integer keys iterating in order.
|
||||
byteOrder = binary.BigEndian
|
||||
)
|
||||
|
||||
const (
|
||||
// unknownFailureSourceIdx is the database encoding of an unknown error
|
||||
// source.
|
||||
unknownFailureSourceIdx = -1
|
||||
)
|
||||
|
||||
// missionControlStore is a bolt db based implementation of a mission control
|
||||
// store. It stores the raw payment attempt data from which the internal mission
|
||||
// controls state can be rederived on startup. This allows the mission control
|
||||
// internal data structure to be changed without requiring a database migration.
|
||||
// Also changes to mission control parameters can be applied to historical data.
|
||||
// Finally, it enables importing raw data from an external source.
|
||||
type missionControlStore struct {
|
||||
db *bbolt.DB
|
||||
maxRecords int
|
||||
numRecords int
|
||||
}
|
||||
|
||||
func newMissionControlStore(db *bbolt.DB, maxRecords int) (*missionControlStore, error) {
|
||||
store := &missionControlStore{
|
||||
db: db,
|
||||
maxRecords: maxRecords,
|
||||
}
|
||||
|
||||
// Create buckets if not yet existing.
|
||||
err := db.Update(func(tx *bbolt.Tx) error {
|
||||
resultsBucket, err := tx.CreateBucketIfNotExists(resultsKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create results bucket: %v",
|
||||
err)
|
||||
}
|
||||
|
||||
// Count initial number of results and track this number in
|
||||
// memory to avoid calling Stats().KeyN. The reliability of
|
||||
// Stats() is doubtful and seemed to have caused crashes in the
|
||||
// past (see #1874).
|
||||
c := resultsBucket.Cursor()
|
||||
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
||||
store.numRecords++
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return store, nil
|
||||
}
|
||||
|
||||
// clear removes all results from the db.
|
||||
func (b *missionControlStore) clear() error {
|
||||
return b.db.Update(func(tx *bbolt.Tx) error {
|
||||
if err := tx.DeleteBucket(resultsKey); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := tx.CreateBucket(resultsKey)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// fetchAll returns all results currently stored in the database.
|
||||
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
|
||||
var results []*paymentResult
|
||||
|
||||
err := b.db.View(func(tx *bbolt.Tx) error {
|
||||
resultBucket := tx.Bucket(resultsKey)
|
||||
results = make([]*paymentResult, 0)
|
||||
|
||||
return resultBucket.ForEach(func(k, v []byte) error {
|
||||
result, err := deserializeResult(k, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
results = append(results, result)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// serializeResult serializes a payment result and returns a key and value byte
|
||||
// slice to insert into the bucket.
|
||||
func serializeResult(rp *paymentResult) ([]byte, []byte, error) {
|
||||
// Write timestamps, success status, failure source index and route.
|
||||
var b bytes.Buffer
|
||||
|
||||
var dbFailureSourceIdx int32
|
||||
if rp.failureSourceIdx == nil {
|
||||
dbFailureSourceIdx = unknownFailureSourceIdx
|
||||
} else {
|
||||
dbFailureSourceIdx = int32(*rp.failureSourceIdx)
|
||||
}
|
||||
|
||||
err := channeldb.WriteElements(
|
||||
&b,
|
||||
uint64(rp.timeFwd.UnixNano()),
|
||||
uint64(rp.timeReply.UnixNano()),
|
||||
rp.success, dbFailureSourceIdx,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if err := channeldb.SerializeRoute(&b, *rp.route); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Write failure. If there is no failure message, write an empty
|
||||
// byte slice.
|
||||
var failureBytes bytes.Buffer
|
||||
if rp.failure != nil {
|
||||
err := lnwire.EncodeFailureMessage(&failureBytes, rp.failure, 0)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
err = wire.WriteVarBytes(&b, 0, failureBytes.Bytes())
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Compose key that identifies this result.
|
||||
key := getResultKey(rp)
|
||||
|
||||
return key, b.Bytes(), nil
|
||||
}
|
||||
|
||||
// deserializeResult deserializes a payment result.
|
||||
func deserializeResult(k, v []byte) (*paymentResult, error) {
|
||||
// Parse payment id.
|
||||
result := paymentResult{
|
||||
id: byteOrder.Uint64(k[8:]),
|
||||
}
|
||||
|
||||
r := bytes.NewReader(v)
|
||||
|
||||
// Read timestamps, success status and failure source index.
|
||||
var (
|
||||
timeFwd, timeReply uint64
|
||||
dbFailureSourceIdx int32
|
||||
)
|
||||
|
||||
err := channeldb.ReadElements(
|
||||
r, &timeFwd, &timeReply, &result.success, &dbFailureSourceIdx,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Convert time stamps to local time zone for consistent logging.
|
||||
result.timeFwd = time.Unix(0, int64(timeFwd)).Local()
|
||||
result.timeReply = time.Unix(0, int64(timeReply)).Local()
|
||||
|
||||
// Convert from unknown index magic number to nil value.
|
||||
if dbFailureSourceIdx != unknownFailureSourceIdx {
|
||||
failureSourceIdx := int(dbFailureSourceIdx)
|
||||
result.failureSourceIdx = &failureSourceIdx
|
||||
}
|
||||
|
||||
// Read route.
|
||||
route, err := channeldb.DeserializeRoute(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.route = &route
|
||||
|
||||
// Read failure.
|
||||
failureBytes, err := wire.ReadVarBytes(
|
||||
r, 0, lnwire.FailureMessageLength, "failure",
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(failureBytes) > 0 {
|
||||
result.failure, err = lnwire.DecodeFailureMessage(
|
||||
bytes.NewReader(failureBytes), 0,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// AddResult adds a new result to the db.
|
||||
func (b *missionControlStore) AddResult(rp *paymentResult) error {
|
||||
return b.db.Update(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket(resultsKey)
|
||||
|
||||
// Prune oldest entries.
|
||||
if b.maxRecords > 0 {
|
||||
for b.numRecords >= b.maxRecords {
|
||||
cursor := bucket.Cursor()
|
||||
cursor.First()
|
||||
if err := cursor.Delete(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.numRecords--
|
||||
}
|
||||
}
|
||||
|
||||
// Serialize result into key and value byte slices.
|
||||
k, v, err := serializeResult(rp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// The store is assumed to be idempotent. It could be that the
|
||||
// same result is added twice and in that case the counter
|
||||
// shouldn't be increased.
|
||||
if bucket.Get(k) == nil {
|
||||
b.numRecords++
|
||||
}
|
||||
|
||||
// Put into results bucket.
|
||||
return bucket.Put(k, v)
|
||||
})
|
||||
}
|
||||
|
||||
// getResultKey returns a byte slice representing a unique key for this payment
|
||||
// result.
|
||||
func getResultKey(rp *paymentResult) []byte {
|
||||
var keyBytes [8 + 8 + 33]byte
|
||||
|
||||
// Identify records by a combination of time, payment id and sender pub
|
||||
// key. This allows importing mission control data from an external
|
||||
// source without key collisions and keeps the records sorted
|
||||
// chronologically.
|
||||
byteOrder.PutUint64(keyBytes[:], uint64(rp.timeReply.UnixNano()))
|
||||
byteOrder.PutUint64(keyBytes[8:], rp.id)
|
||||
copy(keyBytes[16:], rp.route.SourcePubKey[:])
|
||||
|
||||
return keyBytes[:]
|
||||
}
|
140
routing/missioncontrol_store_test.go
Normal file
140
routing/missioncontrol_store_test.go
Normal file
@ -0,0 +1,140 @@
|
||||
package routing
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
|
||||
"github.com/coreos/bbolt"
|
||||
"github.com/lightningnetwork/lnd/routing/route"
|
||||
)
|
||||
|
||||
const testMaxRecords = 2
|
||||
|
||||
func TestMissionControlStore(t *testing.T) {
|
||||
// Set time zone explictly to keep test deterministic.
|
||||
time.Local = time.UTC
|
||||
|
||||
file, err := ioutil.TempFile("", "*.db")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dbPath := file.Name()
|
||||
|
||||
db, err := bbolt.Open(dbPath, 0600, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
defer os.Remove(dbPath)
|
||||
|
||||
store, err := newMissionControlStore(db, testMaxRecords)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
results, err := store.fetchAll()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(results) != 0 {
|
||||
t.Fatal("expected no results")
|
||||
}
|
||||
|
||||
testRoute := route.Route{
|
||||
SourcePubKey: route.Vertex{1},
|
||||
Hops: []*route.Hop{
|
||||
{
|
||||
PubKeyBytes: route.Vertex{2},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
failureSourceIdx := 1
|
||||
|
||||
result1 := paymentResult{
|
||||
route: &testRoute,
|
||||
failure: lnwire.NewFailUnknownPaymentHash(100),
|
||||
failureSourceIdx: &failureSourceIdx,
|
||||
id: 99,
|
||||
timeReply: testTime,
|
||||
timeFwd: testTime.Add(-time.Minute),
|
||||
}
|
||||
|
||||
result2 := result1
|
||||
result2.timeReply = result1.timeReply.Add(time.Hour)
|
||||
result2.timeFwd = result1.timeReply.Add(time.Hour)
|
||||
result2.id = 2
|
||||
|
||||
// Store result.
|
||||
err = store.AddResult(&result2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Store again to test idempotency.
|
||||
err = store.AddResult(&result2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Store second result which has an earlier timestamp.
|
||||
err = store.AddResult(&result1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
results, err = store.fetchAll()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(results) != 2 {
|
||||
t.Fatal("expected two results")
|
||||
}
|
||||
|
||||
// Check that results are stored in chronological order.
|
||||
if !reflect.DeepEqual(&result1, results[0]) {
|
||||
t.Fatal()
|
||||
}
|
||||
if !reflect.DeepEqual(&result2, results[1]) {
|
||||
t.Fatal()
|
||||
}
|
||||
|
||||
// Recreate store to test pruning.
|
||||
store, err = newMissionControlStore(db, testMaxRecords)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Add a newer result.
|
||||
result3 := result1
|
||||
result3.timeReply = result1.timeReply.Add(2 * time.Hour)
|
||||
result3.timeFwd = result1.timeReply.Add(2 * time.Hour)
|
||||
result3.id = 3
|
||||
|
||||
err = store.AddResult(&result3)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Check that results are pruned.
|
||||
results, err = store.fetchAll()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(results) != 2 {
|
||||
t.Fatal("expected two results")
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(&result2, results[0]) {
|
||||
t.Fatal()
|
||||
}
|
||||
if !reflect.DeepEqual(&result3, results[1]) {
|
||||
t.Fatal()
|
||||
}
|
||||
}
|
@ -1,25 +1,50 @@
|
||||
package routing
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/bbolt"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/routing/route"
|
||||
)
|
||||
|
||||
var (
|
||||
mcTestNode = route.Vertex{}
|
||||
mcTestEdge = EdgeLocator{
|
||||
ChannelID: 123,
|
||||
ChannelID: 2,
|
||||
}
|
||||
mcTestTime = time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC)
|
||||
|
||||
mcTestRoute = &route.Route{
|
||||
SourcePubKey: route.Vertex{10},
|
||||
Hops: []*route.Hop{
|
||||
{
|
||||
ChannelID: 1,
|
||||
PubKeyBytes: route.Vertex{11},
|
||||
AmtToForward: 1000,
|
||||
},
|
||||
{
|
||||
ChannelID: 2,
|
||||
PubKeyBytes: route.Vertex{12},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
mcTestTime = time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC)
|
||||
mcTestNode1 = mcTestRoute.Hops[0].PubKeyBytes
|
||||
mcTestNode2 = mcTestRoute.Hops[1].PubKeyBytes
|
||||
)
|
||||
|
||||
type mcTestContext struct {
|
||||
t *testing.T
|
||||
mc *MissionControl
|
||||
now time.Time
|
||||
|
||||
db *bbolt.DB
|
||||
dbPath string
|
||||
|
||||
pid uint64
|
||||
}
|
||||
|
||||
func createMcTestContext(t *testing.T) *mcTestContext {
|
||||
@ -28,17 +53,44 @@ func createMcTestContext(t *testing.T) *mcTestContext {
|
||||
now: mcTestTime,
|
||||
}
|
||||
|
||||
mc := NewMissionControl(
|
||||
file, err := ioutil.TempFile("", "*.db")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx.dbPath = file.Name()
|
||||
|
||||
ctx.db, err = bbolt.Open(ctx.dbPath, 0600, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx.restartMc()
|
||||
|
||||
return ctx
|
||||
}
|
||||
|
||||
// restartMc creates a new instances of mission control on the same database.
|
||||
func (ctx *mcTestContext) restartMc() {
|
||||
mc, err := NewMissionControl(
|
||||
ctx.db,
|
||||
&MissionControlConfig{
|
||||
PenaltyHalfLife: 30 * time.Minute,
|
||||
AprioriHopProbability: 0.8,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
ctx.t.Fatal(err)
|
||||
}
|
||||
|
||||
mc.now = func() time.Time { return ctx.now }
|
||||
ctx.mc = mc
|
||||
}
|
||||
|
||||
return ctx
|
||||
// cleanup closes the database and removes the temp file.
|
||||
func (ctx *mcTestContext) cleanup() {
|
||||
ctx.db.Close()
|
||||
os.Remove(ctx.dbPath)
|
||||
}
|
||||
|
||||
// Assert that mission control returns a probability for an edge.
|
||||
@ -47,30 +99,41 @@ func (ctx *mcTestContext) expectP(amt lnwire.MilliSatoshi,
|
||||
|
||||
ctx.t.Helper()
|
||||
|
||||
p := ctx.mc.GetEdgeProbability(mcTestNode, mcTestEdge, amt)
|
||||
p := ctx.mc.GetEdgeProbability(mcTestNode1, mcTestEdge, amt)
|
||||
if p != expected {
|
||||
ctx.t.Fatalf("unexpected probability %v", p)
|
||||
}
|
||||
}
|
||||
|
||||
// reportFailure reports a failure by using a test route.
|
||||
func (ctx *mcTestContext) reportFailure(t time.Time,
|
||||
amt lnwire.MilliSatoshi, failure lnwire.FailureMessage) {
|
||||
|
||||
mcTestRoute.Hops[0].AmtToForward = amt
|
||||
|
||||
errorSourceIdx := 1
|
||||
ctx.mc.ReportPaymentFail(
|
||||
ctx.pid, mcTestRoute, &errorSourceIdx, failure,
|
||||
)
|
||||
}
|
||||
|
||||
// TestMissionControl tests mission control probability estimation.
|
||||
func TestMissionControl(t *testing.T) {
|
||||
ctx := createMcTestContext(t)
|
||||
defer ctx.cleanup()
|
||||
|
||||
ctx.now = testTime
|
||||
|
||||
testTime := time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC)
|
||||
|
||||
testNode := route.Vertex{}
|
||||
testEdge := edge{
|
||||
channel: 123,
|
||||
}
|
||||
|
||||
// Initial probability is expected to be 1.
|
||||
ctx.expectP(1000, 0.8)
|
||||
|
||||
// Expect probability to be zero after reporting the edge as failed.
|
||||
ctx.mc.ReportEdgeFailure(testEdge, 1000)
|
||||
ctx.reportFailure(
|
||||
testTime, 1000,
|
||||
lnwire.NewTemporaryChannelFailure(nil),
|
||||
)
|
||||
ctx.expectP(1000, 0)
|
||||
|
||||
// As we reported with a min penalization amt, a lower amt than reported
|
||||
@ -83,7 +146,10 @@ func TestMissionControl(t *testing.T) {
|
||||
|
||||
// Edge fails again, this time without a min penalization amt. The edge
|
||||
// should be penalized regardless of amount.
|
||||
ctx.mc.ReportEdgeFailure(testEdge, 0)
|
||||
ctx.reportFailure(
|
||||
ctx.now, 0,
|
||||
lnwire.NewTemporaryChannelFailure(nil),
|
||||
)
|
||||
ctx.expectP(1000, 0)
|
||||
ctx.expectP(500, 0)
|
||||
|
||||
@ -91,9 +157,16 @@ func TestMissionControl(t *testing.T) {
|
||||
ctx.now = testTime.Add(60 * time.Minute)
|
||||
ctx.expectP(1000, 0.4)
|
||||
|
||||
// Restart mission control to test persistence.
|
||||
ctx.restartMc()
|
||||
ctx.expectP(1000, 0.4)
|
||||
|
||||
// A node level failure should bring probability of every channel back
|
||||
// to zero.
|
||||
ctx.mc.ReportVertexFailure(testNode)
|
||||
ctx.reportFailure(
|
||||
ctx.now, 0,
|
||||
lnwire.NewExpiryTooSoon(lnwire.ChannelUpdate{}),
|
||||
)
|
||||
ctx.expectP(1000, 0)
|
||||
|
||||
// Check whether history snapshot looks sane.
|
||||
@ -111,20 +184,21 @@ func TestMissionControl(t *testing.T) {
|
||||
// penalizing the channel yet.
|
||||
func TestMissionControlChannelUpdate(t *testing.T) {
|
||||
ctx := createMcTestContext(t)
|
||||
|
||||
testEdge := edge{
|
||||
channel: 123,
|
||||
}
|
||||
defer ctx.cleanup()
|
||||
|
||||
// Report a policy related failure. Because it is the first, we don't
|
||||
// expect a penalty.
|
||||
ctx.mc.ReportEdgePolicyFailure(testEdge)
|
||||
|
||||
ctx.reportFailure(
|
||||
ctx.now, 0,
|
||||
lnwire.NewFeeInsufficient(0, lnwire.ChannelUpdate{}),
|
||||
)
|
||||
ctx.expectP(0, 0.8)
|
||||
|
||||
// Report another failure for the same channel. We expect it to be
|
||||
// pruned.
|
||||
ctx.mc.ReportEdgePolicyFailure(testEdge)
|
||||
|
||||
ctx.reportFailure(
|
||||
ctx.now, 0,
|
||||
lnwire.NewFeeInsufficient(0, lnwire.ChannelUpdate{}),
|
||||
)
|
||||
ctx.expectP(0, 0)
|
||||
}
|
||||
|
@ -98,6 +98,13 @@ type mockMissionControl struct {
|
||||
|
||||
var _ MissionController = (*mockMissionControl)(nil)
|
||||
|
||||
func (m *mockMissionControl) ReportPaymentFail(paymentID uint64,
|
||||
rt *route.Route, failureSourceIdx *int, failure lnwire.FailureMessage) (
|
||||
bool, channeldb.FailureReason, error) {
|
||||
|
||||
return false, 0, nil
|
||||
}
|
||||
|
||||
func (m *mockMissionControl) ReportEdgeFailure(failedEdge edge,
|
||||
minPenalizeAmt lnwire.MilliSatoshi) {
|
||||
}
|
||||
|
@ -343,7 +343,7 @@ func (p *paymentLifecycle) sendPaymentAttempt(firstHop lnwire.ShortChannelID,
|
||||
func (p *paymentLifecycle) handleSendError(sendErr error) error {
|
||||
|
||||
final, reason := p.router.processSendError(
|
||||
&p.attempt.Route, sendErr,
|
||||
p.attempt.PaymentID, &p.attempt.Route, sendErr,
|
||||
)
|
||||
if !final {
|
||||
// Save the forwarding error so it can be returned if
|
||||
|
@ -174,15 +174,13 @@ type PaymentSessionSource interface {
|
||||
// MissionController is an interface that exposes failure reporting and
|
||||
// probability estimation.
|
||||
type MissionController interface {
|
||||
// ReportEdgeFailure reports a channel level failure.
|
||||
ReportEdgeFailure(failedEdge edge,
|
||||
minPenalizeAmt lnwire.MilliSatoshi)
|
||||
|
||||
// ReportEdgePolicyFailure reports a policy related failure.
|
||||
ReportEdgePolicyFailure(failedEdge edge)
|
||||
|
||||
// ReportVertexFailure reports a node level failure.
|
||||
ReportVertexFailure(v route.Vertex)
|
||||
// ReportPaymentFail reports a failed payment to mission control as
|
||||
// input for future probability estimates. It returns a bool indicating
|
||||
// whether this error is a final error and no further payment attempts
|
||||
// need to be made.
|
||||
ReportPaymentFail(paymentID uint64, rt *route.Route,
|
||||
failureSourceIdx *int, failure lnwire.FailureMessage) (bool,
|
||||
channeldb.FailureReason, error)
|
||||
|
||||
// GetEdgeProbability is expected to return the success probability of a
|
||||
// payment from fromNode along edge.
|
||||
@ -1895,21 +1893,33 @@ func (r *ChannelRouter) tryApplyChannelUpdate(rt *route.Route,
|
||||
// error type, this error is either the final outcome of the payment or we need
|
||||
// to continue with an alternative route. This is indicated by the boolean
|
||||
// return value.
|
||||
func (r *ChannelRouter) processSendError(rt *route.Route, sendErr error) (
|
||||
bool, channeldb.FailureReason) {
|
||||
func (r *ChannelRouter) processSendError(paymentID uint64, rt *route.Route,
|
||||
sendErr error) (bool, channeldb.FailureReason) {
|
||||
|
||||
// If the failure message could not be decrypted, attribute the failure
|
||||
// to our own outgoing channel.
|
||||
//
|
||||
// TODO(joostager): Penalize all channels in the route.
|
||||
if sendErr == htlcswitch.ErrUnreadableFailureMessage {
|
||||
sendErr = &htlcswitch.ForwardingError{
|
||||
FailureSourceIdx: 0,
|
||||
FailureMessage: lnwire.NewTemporaryChannelFailure(nil),
|
||||
reportFail := func(srcIdx *int, msg lnwire.FailureMessage) (bool,
|
||||
channeldb.FailureReason) {
|
||||
|
||||
// Report outcome to mission control.
|
||||
final, reason, err := r.cfg.MissionControl.ReportPaymentFail(
|
||||
paymentID, rt, srcIdx, msg,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("Error reporting payment result to mc: %v",
|
||||
err)
|
||||
|
||||
return true, channeldb.FailureReasonError
|
||||
}
|
||||
|
||||
return final, reason
|
||||
}
|
||||
|
||||
// If an internal, non-forwarding error occurred, we can stop trying.
|
||||
if sendErr == htlcswitch.ErrUnreadableFailureMessage {
|
||||
log.Tracef("Unreadable failure when sending htlc")
|
||||
|
||||
return reportFail(nil, nil)
|
||||
}
|
||||
// If an internal, non-forwarding error occurred, we can stop
|
||||
// trying.
|
||||
fErr, ok := sendErr.(*htlcswitch.ForwardingError)
|
||||
if !ok {
|
||||
return true, channeldb.FailureReasonError
|
||||
@ -1929,195 +1939,10 @@ func (r *ChannelRouter) processSendError(rt *route.Route, sendErr error) (
|
||||
}
|
||||
}
|
||||
|
||||
var failureVertex route.Vertex
|
||||
log.Tracef("Node=%v reported failure when sending htlc",
|
||||
failureSourceIdx)
|
||||
|
||||
// For any non-self failure, look up the source pub key in the hops
|
||||
// slice. Otherwise return the self node pubkey.
|
||||
if failureSourceIdx > 0 {
|
||||
failureVertex = rt.Hops[failureSourceIdx-1].PubKeyBytes
|
||||
} else {
|
||||
failureVertex = r.selfNode.PubKeyBytes
|
||||
}
|
||||
log.Tracef("Node %x (index %v) reported failure when sending htlc",
|
||||
failureVertex, failureSourceIdx)
|
||||
|
||||
// Always determine chan id ourselves, because a channel
|
||||
// update with id may not be available.
|
||||
failedEdge, failedAmt := getFailedEdge(rt, failureSourceIdx)
|
||||
|
||||
switch fErr.FailureMessage.(type) {
|
||||
|
||||
// If the end destination didn't know the payment
|
||||
// hash or we sent the wrong payment amount to the
|
||||
// destination, then we'll terminate immediately.
|
||||
case *lnwire.FailUnknownPaymentHash:
|
||||
// TODO(joostjager): Check onionErr.Amount() whether it matches
|
||||
// what we expect. (Will it ever not match, because if not
|
||||
// final_incorrect_htlc_amount would be returned?)
|
||||
|
||||
return true, channeldb.FailureReasonIncorrectPaymentDetails
|
||||
|
||||
// If we sent the wrong amount to the destination, then
|
||||
// we'll exit early.
|
||||
case *lnwire.FailIncorrectPaymentAmount:
|
||||
return true, channeldb.FailureReasonIncorrectPaymentDetails
|
||||
|
||||
// If the time-lock that was extended to the final node
|
||||
// was incorrect, then we can't proceed.
|
||||
case *lnwire.FailFinalIncorrectCltvExpiry:
|
||||
// TODO(joostjager): Take into account that second last hop may
|
||||
// have deliberately handed out an htlc that expires too soon.
|
||||
// In that case we should continue routing.
|
||||
return true, channeldb.FailureReasonError
|
||||
|
||||
// If we crafted an invalid onion payload for the final
|
||||
// node, then we'll exit early.
|
||||
case *lnwire.FailFinalIncorrectHtlcAmount:
|
||||
// TODO(joostjager): Take into account that second last hop may
|
||||
// have deliberately handed out an htlc with a too low value. In
|
||||
// that case we should continue routing.
|
||||
|
||||
return true, channeldb.FailureReasonError
|
||||
|
||||
// Similarly, if the HTLC expiry that we extended to
|
||||
// the final hop expires too soon, then will fail the
|
||||
// payment.
|
||||
//
|
||||
// TODO(roasbeef): can happen to to race condition, try
|
||||
// again with recent block height
|
||||
case *lnwire.FailFinalExpiryTooSoon:
|
||||
// TODO(joostjager): Take into account that any hop may have
|
||||
// delayed. Ideally we should continue routing. Knowing the
|
||||
// delaying node at this point would help.
|
||||
return true, channeldb.FailureReasonIncorrectPaymentDetails
|
||||
|
||||
// If we erroneously attempted to cross a chain border,
|
||||
// then we'll cancel the payment.
|
||||
case *lnwire.FailInvalidRealm:
|
||||
return true, channeldb.FailureReasonError
|
||||
|
||||
// If we get a notice that the expiry was too soon for
|
||||
// an intermediate node, then we'll prune out the node
|
||||
// that sent us this error, as it doesn't now what the
|
||||
// correct block height is.
|
||||
case *lnwire.FailExpiryTooSoon:
|
||||
r.cfg.MissionControl.ReportVertexFailure(failureVertex)
|
||||
return false, 0
|
||||
|
||||
// If we hit an instance of onion payload corruption or an invalid
|
||||
// version, then we'll exit early as this shouldn't happen in the
|
||||
// typical case.
|
||||
//
|
||||
// TODO(joostjager): Take into account that the previous hop may have
|
||||
// tampered with the onion. Routing should continue using other paths.
|
||||
case *lnwire.FailInvalidOnionVersion:
|
||||
return true, channeldb.FailureReasonError
|
||||
case *lnwire.FailInvalidOnionHmac:
|
||||
return true, channeldb.FailureReasonError
|
||||
case *lnwire.FailInvalidOnionKey:
|
||||
return true, channeldb.FailureReasonError
|
||||
|
||||
// If we get a failure due to violating the minimum
|
||||
// amount, we'll apply the new minimum amount and retry
|
||||
// routing.
|
||||
case *lnwire.FailAmountBelowMinimum:
|
||||
r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge)
|
||||
return false, 0
|
||||
|
||||
// If we get a failure due to a fee, we'll apply the
|
||||
// new fee update, and retry our attempt using the
|
||||
// newly updated fees.
|
||||
case *lnwire.FailFeeInsufficient:
|
||||
r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge)
|
||||
return false, 0
|
||||
|
||||
// If we get the failure for an intermediate node that
|
||||
// disagrees with our time lock values, then we'll
|
||||
// apply the new delta value and try it once more.
|
||||
case *lnwire.FailIncorrectCltvExpiry:
|
||||
r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge)
|
||||
return false, 0
|
||||
|
||||
// The outgoing channel that this node was meant to
|
||||
// forward one is currently disabled, so we'll apply
|
||||
// the update and continue.
|
||||
case *lnwire.FailChannelDisabled:
|
||||
r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0)
|
||||
return false, 0
|
||||
|
||||
// It's likely that the outgoing channel didn't have
|
||||
// sufficient capacity, so we'll prune this edge for
|
||||
// now, and continue onwards with our path finding.
|
||||
case *lnwire.FailTemporaryChannelFailure:
|
||||
r.cfg.MissionControl.ReportEdgeFailure(failedEdge, failedAmt)
|
||||
return false, 0
|
||||
|
||||
// If the send fail due to a node not having the
|
||||
// required features, then we'll note this error and
|
||||
// continue.
|
||||
case *lnwire.FailRequiredNodeFeatureMissing:
|
||||
r.cfg.MissionControl.ReportVertexFailure(failureVertex)
|
||||
return false, 0
|
||||
|
||||
// If the send fail due to a node not having the
|
||||
// required features, then we'll note this error and
|
||||
// continue.
|
||||
case *lnwire.FailRequiredChannelFeatureMissing:
|
||||
r.cfg.MissionControl.ReportVertexFailure(failureVertex)
|
||||
return false, 0
|
||||
|
||||
// If the next hop in the route wasn't known or
|
||||
// offline, we'll only the channel which we attempted
|
||||
// to route over. This is conservative, and it can
|
||||
// handle faulty channels between nodes properly.
|
||||
// Additionally, this guards against routing nodes
|
||||
// returning errors in order to attempt to black list
|
||||
// another node.
|
||||
case *lnwire.FailUnknownNextPeer:
|
||||
r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0)
|
||||
return false, 0
|
||||
|
||||
// If the node wasn't able to forward for which ever
|
||||
// reason, then we'll note this and continue with the
|
||||
// routes.
|
||||
case *lnwire.FailTemporaryNodeFailure:
|
||||
r.cfg.MissionControl.ReportVertexFailure(failureVertex)
|
||||
return false, 0
|
||||
|
||||
case *lnwire.FailPermanentNodeFailure:
|
||||
r.cfg.MissionControl.ReportVertexFailure(failureVertex)
|
||||
return false, 0
|
||||
|
||||
// If we crafted a route that contains a too long time
|
||||
// lock for an intermediate node, we'll prune the node.
|
||||
// As there currently is no way of knowing that node's
|
||||
// maximum acceptable cltv, we cannot take this
|
||||
// constraint into account during routing.
|
||||
//
|
||||
// TODO(joostjager): Record the rejected cltv and use
|
||||
// that as a hint during future path finding through
|
||||
// that node.
|
||||
case *lnwire.FailExpiryTooFar:
|
||||
r.cfg.MissionControl.ReportVertexFailure(failureVertex)
|
||||
return false, 0
|
||||
|
||||
// If we get a permanent channel or node failure, then
|
||||
// we'll prune the channel in both directions and
|
||||
// continue with the rest of the routes.
|
||||
case *lnwire.FailPermanentChannelFailure:
|
||||
r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0)
|
||||
r.cfg.MissionControl.ReportEdgeFailure(edge{
|
||||
from: failedEdge.to,
|
||||
to: failedEdge.from,
|
||||
channel: failedEdge.channel,
|
||||
}, 0)
|
||||
return false, 0
|
||||
|
||||
// Any other failure or an empty failure will get the node pruned.
|
||||
default:
|
||||
r.cfg.MissionControl.ReportVertexFailure(failureVertex)
|
||||
return false, 0
|
||||
}
|
||||
return reportFail(&failureSourceIdx, failureMessage)
|
||||
}
|
||||
|
||||
// extractChannelUpdate examines the error and extracts the channel update.
|
||||
@ -2143,46 +1968,6 @@ func (r *ChannelRouter) extractChannelUpdate(
|
||||
return update
|
||||
}
|
||||
|
||||
// getFailedEdge tries to locate the failing channel given a route and the
|
||||
// pubkey of the node that sent the failure. It will assume that the failure is
|
||||
// associated with the outgoing channel of the failing node. As a second result,
|
||||
// it returns the amount sent over the edge.
|
||||
func getFailedEdge(route *route.Route, failureSource int) (edge,
|
||||
lnwire.MilliSatoshi) {
|
||||
|
||||
// Determine if we have a failure from the final hop. If it is, we
|
||||
// assume that the failing channel is the incoming channel. In this
|
||||
// function the outgoing channel of the hop indicated by failureSource
|
||||
// is returned, where index zero is the self node. By decrementing
|
||||
// failureSource by one, the outgoing channel of the penultimate hop is
|
||||
// returned, which is the same as the incoming channel of the final
|
||||
// node.
|
||||
//
|
||||
// TODO(joostjager): In this case, certain types of failures are not
|
||||
// expected. For example FailUnknownNextPeer. This could be a reason to
|
||||
// prune the node?
|
||||
if failureSource == len(route.Hops) {
|
||||
failureSource--
|
||||
}
|
||||
|
||||
// As this failure indicates that the target channel was unable to carry
|
||||
// this HTLC (for w/e reason), we'll return the _outgoing_ channel that
|
||||
// the source of the failure was meant to pass the HTLC along to.
|
||||
if failureSource == 0 {
|
||||
return edge{
|
||||
from: route.SourcePubKey,
|
||||
to: route.Hops[0].PubKeyBytes,
|
||||
channel: route.Hops[0].ChannelID,
|
||||
}, route.TotalAmount
|
||||
}
|
||||
|
||||
return edge{
|
||||
from: route.Hops[failureSource-1].PubKeyBytes,
|
||||
to: route.Hops[failureSource].PubKeyBytes,
|
||||
channel: route.Hops[failureSource].ChannelID,
|
||||
}, route.Hops[failureSource-1].AmtToForward
|
||||
}
|
||||
|
||||
// applyChannelUpdate validates a channel update and if valid, applies it to the
|
||||
// database. It returns a bool indicating whether the updates was successful.
|
||||
func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate,
|
||||
|
@ -90,7 +90,7 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
pathFindingConfig := &PathFindingConfig{
|
||||
pathFindingConfig := PathFindingConfig{
|
||||
MinProbability: 0.01,
|
||||
PaymentAttemptPenalty: 100,
|
||||
}
|
||||
@ -100,9 +100,13 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr
|
||||
AprioriHopProbability: 0.9,
|
||||
}
|
||||
|
||||
mc := NewMissionControl(
|
||||
mc, err := NewMissionControl(
|
||||
graphInstance.graph.Database().DB,
|
||||
mcConfig,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
sessionSource := &SessionSource{
|
||||
Graph: graphInstance.graph,
|
||||
@ -110,11 +114,8 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr
|
||||
QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {
|
||||
return lnwire.NewMSatFromSatoshis(e.Capacity)
|
||||
},
|
||||
PathFindingConfig: PathFindingConfig{
|
||||
MinProbability: 0.01,
|
||||
PaymentAttemptPenalty: 100,
|
||||
},
|
||||
MissionControl: mc,
|
||||
PathFindingConfig: pathFindingConfig,
|
||||
MissionControl: mc,
|
||||
}
|
||||
|
||||
router, err := New(Config{
|
||||
@ -134,7 +135,7 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr
|
||||
next := atomic.AddUint64(&uniquePaymentID, 1)
|
||||
return next, nil
|
||||
},
|
||||
PathFindingConfig: *pathFindingConfig,
|
||||
PathFindingConfig: pathFindingConfig,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("unable to create router %v", err)
|
||||
|
15
server.go
15
server.go
@ -653,21 +653,28 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
|
||||
// servers, the mission control instance itself can be moved there too.
|
||||
routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC)
|
||||
|
||||
s.missionControl = routing.NewMissionControl(
|
||||
s.missionControl, err = routing.NewMissionControl(
|
||||
chanDB.DB,
|
||||
&routing.MissionControlConfig{
|
||||
AprioriHopProbability: routingConfig.AprioriHopProbability,
|
||||
PenaltyHalfLife: routingConfig.PenaltyHalfLife,
|
||||
MaxMcHistory: routingConfig.MaxMcHistory,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't create mission control: %v", err)
|
||||
}
|
||||
|
||||
srvrLog.Debugf("Instantiating payment session source with config: "+
|
||||
"PaymentAttemptPenalty=%v, MinRouteProbability=%v",
|
||||
int64(routingConfig.PaymentAttemptPenalty.ToSatoshis()),
|
||||
int64(routingConfig.AttemptCost),
|
||||
routingConfig.MinRouteProbability)
|
||||
|
||||
pathFindingConfig := routing.PathFindingConfig{
|
||||
PaymentAttemptPenalty: routingConfig.PaymentAttemptPenalty,
|
||||
MinProbability: routingConfig.MinRouteProbability,
|
||||
PaymentAttemptPenalty: lnwire.NewMSatFromSatoshis(
|
||||
routingConfig.AttemptCost,
|
||||
),
|
||||
MinProbability: routingConfig.MinRouteProbability,
|
||||
}
|
||||
|
||||
paymentSessionSource := &routing.SessionSource{
|
||||
|
Loading…
Reference in New Issue
Block a user