routing/router: persist payment state machine

This commit makes the router use the ControlTower to drive the payment
life cycle state machine, to keep track of active payments across
restarts.  This lets the router resume payments on startup, such that
their final results can be handled and stored when ready.
This commit is contained in:
Johan T. Halseth 2019-05-23 20:05:29 +02:00
parent dd73c51a34
commit de1bf8a518
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
6 changed files with 198 additions and 56 deletions

@ -573,6 +573,48 @@ func fetchPayment(bucket *bbolt.Bucket) (*Payment, error) {
return p, nil return p, nil
} }
// DeletePayments deletes all completed and failed payments from the DB.
func (db *DB) DeletePayments() error {
return db.Update(func(tx *bbolt.Tx) error {
payments := tx.Bucket(paymentsRootBucket)
if payments == nil {
return nil
}
var deleteBuckets [][]byte
err := payments.ForEach(func(k, _ []byte) error {
bucket := payments.Bucket(k)
if bucket == nil {
// We only expect sub-buckets to be found in
// this top-level bucket.
return fmt.Errorf("non bucket element in " +
"payments bucket")
}
// If the status is InFlight, we cannot safely delete
// the payment information, so we return early.
paymentStatus := fetchPaymentStatus(bucket)
if paymentStatus == StatusInFlight {
return nil
}
deleteBuckets = append(deleteBuckets, k)
return nil
})
if err != nil {
return err
}
for _, k := range deleteBuckets {
if err := payments.DeleteBucket(k); err != nil {
return err
}
}
return nil
})
}
func serializePaymentCreationInfo(w io.Writer, c *PaymentCreationInfo) error { func serializePaymentCreationInfo(w io.Writer, c *PaymentCreationInfo) error {
var scratch [8]byte var scratch [8]byte

@ -147,6 +147,15 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
log.Debugf("Payment %x succeeded with pid=%v", log.Debugf("Payment %x succeeded with pid=%v",
p.payment.PaymentHash, p.attempt.PaymentID) p.payment.PaymentHash, p.attempt.PaymentID)
// In case of success we atomically store the db payment and
// move the payment to the success state.
err = p.router.cfg.Control.Success(p.payment.PaymentHash, result.Preimage)
if err != nil {
log.Errorf("Unable to succeed payment "+
"attempt: %v", err)
return [32]byte{}, nil, err
}
// Terminal state, return the preimage and the route // Terminal state, return the preimage and the route
// taken. // taken.
return result.Preimage, &p.attempt.Route, nil return result.Preimage, &p.attempt.Route, nil
@ -165,6 +174,15 @@ func (p *paymentLifecycle) createNewPaymentAttempt() (lnwire.ShortChannelID,
// attempt short. // attempt short.
select { select {
case <-p.timeoutChan: case <-p.timeoutChan:
// Mark the payment as failed because of the
// timeout.
err := p.router.cfg.Control.Fail(
p.payment.PaymentHash,
)
if err != nil {
return lnwire.ShortChannelID{}, nil, err
}
errStr := fmt.Sprintf("payment attempt not completed " + errStr := fmt.Sprintf("payment attempt not completed " +
"before timeout") "before timeout")
@ -186,6 +204,16 @@ func (p *paymentLifecycle) createNewPaymentAttempt() (lnwire.ShortChannelID,
p.payment, uint32(p.currentHeight), p.finalCLTVDelta, p.payment, uint32(p.currentHeight), p.finalCLTVDelta,
) )
if err != nil { if err != nil {
// If we're unable to successfully make a payment using
// any of the routes we've found, then mark the payment
// as permanently failed.
saveErr := p.router.cfg.Control.Fail(
p.payment.PaymentHash,
)
if saveErr != nil {
return lnwire.ShortChannelID{}, nil, saveErr
}
// If there was an error already recorded for this // If there was an error already recorded for this
// payment, we'll return that. // payment, we'll return that.
if p.lastError != nil { if p.lastError != nil {
@ -250,6 +278,17 @@ func (p *paymentLifecycle) createNewPaymentAttempt() (lnwire.ShortChannelID,
Route: *route, Route: *route,
} }
// Before sending this HTLC to the switch, we checkpoint the
// fresh paymentID and route to the DB. This lets us know on
// startup the ID of the payment that we attempted to send,
// such that we can query the Switch for its whereabouts. The
// route is needed to handle the result when it eventually
// comes back.
err = p.router.cfg.Control.RegisterAttempt(p.payment.PaymentHash, p.attempt)
if err != nil {
return lnwire.ShortChannelID{}, nil, err
}
return firstHop, htlcAdd, nil return firstHop, htlcAdd, nil
} }
@ -295,6 +334,16 @@ func (p *paymentLifecycle) handleSendError(sendErr error) error {
log.Errorf("Payment %x failed with final outcome: %v", log.Errorf("Payment %x failed with final outcome: %v",
p.payment.PaymentHash, sendErr) p.payment.PaymentHash, sendErr)
// Mark the payment failed with no route.
// TODO(halseth): make payment codes for the actual reason we
// don't continue path finding.
err := p.router.cfg.Control.Fail(
p.payment.PaymentHash,
)
if err != nil {
return err
}
// Terminal state, return the error we encountered. // Terminal state, return the error we encountered.
return sendErr return sendErr
} }

@ -199,6 +199,10 @@ type Config struct {
// their results. // their results.
Payer PaymentAttemptDispatcher Payer PaymentAttemptDispatcher
// Control keeps track of the status of ongoing payments, ensuring we
// can properly resume them across restarts.
Control channeldb.ControlTower
// ChannelPruneExpiry is the duration used to determine if a channel // ChannelPruneExpiry is the duration used to determine if a channel
// should be pruned or not. If the delta between now and when the // should be pruned or not. If the delta between now and when the
// channel was last updated is greater than ChannelPruneExpiry, then // channel was last updated is greater than ChannelPruneExpiry, then
@ -1551,7 +1555,23 @@ func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte, *route
return [32]byte{}, nil, err return [32]byte{}, nil, err
} }
return r.sendPayment(payment, paySession) // Record this payment hash with the ControlTower, ensuring it is not
// already in-flight.
info := &channeldb.PaymentCreationInfo{
PaymentHash: payment.PaymentHash,
Value: payment.Amount,
CreationDate: time.Now(),
PaymentRequest: nil,
}
err = r.cfg.Control.InitPayment(payment.PaymentHash, info)
if err != nil {
return [32]byte{}, nil, err
}
// Since this is the first time this payment is being made, we pass nil
// for the existing attempt.
return r.sendPayment(nil, payment, paySession)
} }
// SendToRoute attempts to send a payment with the given hash through the // SendToRoute attempts to send a payment with the given hash through the
@ -1569,8 +1589,23 @@ func (r *ChannelRouter) SendToRoute(hash lntypes.Hash, route *route.Route) (
PaymentHash: hash, PaymentHash: hash,
} }
preimage, _, err := r.sendPayment(payment, paySession) // Record this payment hash with the ControlTower, ensuring it is not
// already in-flight.
info := &channeldb.PaymentCreationInfo{
PaymentHash: payment.PaymentHash,
Value: payment.Amount,
CreationDate: time.Now(),
PaymentRequest: nil,
}
err := r.cfg.Control.InitPayment(payment.PaymentHash, info)
if err != nil {
return [32]byte{}, err
}
// Since this is the first time this payment is being made, we pass nil
// for the existing attempt.
preimage, _, err := r.sendPayment(nil, payment, paySession)
return preimage, err return preimage, err
} }
@ -1581,8 +1616,19 @@ func (r *ChannelRouter) SendToRoute(hash lntypes.Hash, route *route.Route) (
// will be returned which describes the path the successful payment traversed // will be returned which describes the path the successful payment traversed
// within the network to reach the destination. Additionally, the payment // within the network to reach the destination. Additionally, the payment
// preimage will also be returned. // preimage will also be returned.
func (r *ChannelRouter) sendPayment(payment *LightningPayment, //
paySession *paymentSession) ([32]byte, *route.Route, error) { // The existing attempt argument should be set to nil if this is a payment that
// haven't had any payment attempt sent to the switch yet. If it has had an
// attempt already, it should be passed such that the result can be retrieved.
//
// This method relies on the ControlTower's internal payment state machine to
// carry out its execution. After restarts it is safe, and assumed, that the
// router will call this method for every payment still in-flight according to
// the ControlTower.
func (r *ChannelRouter) sendPayment(
existingAttempt *channeldb.PaymentAttemptInfo,
payment *LightningPayment, paySession *paymentSession) (
[32]byte, *route.Route, error) {
log.Tracef("Dispatching route for lightning payment: %v", log.Tracef("Dispatching route for lightning payment: %v",
newLogClosure(func() string { newLogClosure(func() string {
@ -1627,6 +1673,7 @@ func (r *ChannelRouter) sendPayment(payment *LightningPayment,
timeoutChan: timeoutChan, timeoutChan: timeoutChan,
currentHeight: currentHeight, currentHeight: currentHeight,
finalCLTVDelta: finalCLTVDelta, finalCLTVDelta: finalCLTVDelta,
attempt: existingAttempt,
circuit: nil, circuit: nil,
lastError: nil, lastError: nil,
} }

@ -50,6 +50,7 @@ func (c *testCtx) RestartRouter() error {
Chain: c.chain, Chain: c.chain,
ChainView: c.chainView, ChainView: c.chainView,
Payer: &mockPaymentAttemptDispatcher{}, Payer: &mockPaymentAttemptDispatcher{},
Control: makeMockControlTower(),
ChannelPruneExpiry: time.Hour * 24, ChannelPruneExpiry: time.Hour * 24,
GraphPruneInterval: time.Hour * 2, GraphPruneInterval: time.Hour * 2,
}) })
@ -88,6 +89,7 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr
Chain: chain, Chain: chain,
ChainView: chainView, ChainView: chainView,
Payer: &mockPaymentAttemptDispatcher{}, Payer: &mockPaymentAttemptDispatcher{},
Control: makeMockControlTower(),
ChannelPruneExpiry: time.Hour * 24, ChannelPruneExpiry: time.Hour * 24,
GraphPruneInterval: time.Hour * 2, GraphPruneInterval: time.Hour * 2,
QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {
@ -1528,6 +1530,7 @@ func TestWakeUpOnStaleBranch(t *testing.T) {
Chain: ctx.chain, Chain: ctx.chain,
ChainView: ctx.chainView, ChainView: ctx.chainView,
Payer: &mockPaymentAttemptDispatcher{}, Payer: &mockPaymentAttemptDispatcher{},
Control: makeMockControlTower(),
ChannelPruneExpiry: time.Hour * 24, ChannelPruneExpiry: time.Hour * 24,
GraphPruneInterval: time.Hour * 2, GraphPruneInterval: time.Hour * 2,
}) })
@ -2490,3 +2493,30 @@ func assertChannelsPruned(t *testing.T, graph *channeldb.ChannelGraph,
} }
} }
} }
type mockControlTower struct{}
var _ channeldb.ControlTower = (*mockControlTower)(nil)
func makeMockControlTower() *mockControlTower {
return &mockControlTower{}
}
func (m *mockControlTower) InitPayment(lntypes.Hash,
*channeldb.PaymentCreationInfo) error {
return nil
}
func (m *mockControlTower) RegisterAttempt(lntypes.Hash,
*channeldb.PaymentAttemptInfo) error {
return nil
}
func (m *mockControlTower) Success(paymentHash lntypes.Hash,
preimg lntypes.Preimage) error {
return nil
}
func (m *mockControlTower) Fail(paymentHash lntypes.Hash) error {
return nil
}

@ -2,7 +2,6 @@ package lnd
import ( import (
"bytes" "bytes"
"crypto/sha256"
"crypto/tls" "crypto/tls"
"encoding/hex" "encoding/hex"
"errors" "errors"
@ -2735,33 +2734,6 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription,
} }
} }
// savePayment saves a successfully completed payment to the database for
// historical record keeping.
func (r *rpcServer) savePayment(route *route.Route,
amount lnwire.MilliSatoshi, preImage []byte) error {
paymentPath := make([][33]byte, len(route.Hops))
for i, hop := range route.Hops {
hopPub := hop.PubKeyBytes
copy(paymentPath[i][:], hopPub[:])
}
payment := &channeldb.OutgoingPayment{
Invoice: channeldb.Invoice{
Terms: channeldb.ContractTerm{
Value: amount,
},
CreationDate: time.Now(),
},
Path: paymentPath,
Fee: route.TotalFees(),
TimeLockLength: route.TotalTimeLock,
}
copy(payment.PaymentPreimage[:], preImage)
return r.server.chanDB.AddPayment(payment)
}
// validatePayReqExpiry checks if the passed payment request has expired. In // validatePayReqExpiry checks if the passed payment request has expired. In
// the case it has expired, an error will be returned. // the case it has expired, an error will be returned.
func validatePayReqExpiry(payReq *zpay32.Invoice) error { func validatePayReqExpiry(payReq *zpay32.Invoice) error {
@ -3123,18 +3095,6 @@ func (r *rpcServer) dispatchPaymentIntent(
}, nil }, nil
} }
// Calculate amount paid to receiver.
amt := route.TotalAmount - route.TotalFees()
// Save the completed payment to the database for record keeping
// purposes.
err := r.savePayment(route, amt, preImage[:])
if err != nil {
// We weren't able to save the payment, so we return the save
// err, but a nil routing err.
return nil, err
}
return &paymentIntentResponse{ return &paymentIntentResponse{
Route: route, Route: route,
Preimage: preImage, Preimage: preImage,
@ -4146,8 +4106,8 @@ func (r *rpcServer) ListPayments(ctx context.Context,
rpcsLog.Debugf("[ListPayments]") rpcsLog.Debugf("[ListPayments]")
payments, err := r.server.chanDB.FetchAllPayments() payments, err := r.server.chanDB.FetchPayments()
if err != nil && err != channeldb.ErrNoPaymentsCreated { if err != nil {
return nil, err return nil, err
} }
@ -4155,24 +4115,37 @@ func (r *rpcServer) ListPayments(ctx context.Context,
Payments: make([]*lnrpc.Payment, len(payments)), Payments: make([]*lnrpc.Payment, len(payments)),
} }
for i, payment := range payments { for i, payment := range payments {
path := make([]string, len(payment.Path)) // If a payment attempt has been made we can fetch the route.
for i, hop := range payment.Path { // Otherwise we'll just populate the RPC response with an empty
path[i] = hex.EncodeToString(hop[:]) // one.
var route route.Route
if payment.Attempt != nil {
route = payment.Attempt.Route
}
path := make([]string, len(route.Hops))
for i, hop := range route.Hops {
path[i] = hex.EncodeToString(hop.PubKeyBytes[:])
} }
msatValue := int64(payment.Terms.Value) // If this payment is settled, the preimage will be available.
satValue := int64(payment.Terms.Value.ToSatoshis()) var preimage lntypes.Preimage
if payment.PaymentPreimage != nil {
preimage = *payment.PaymentPreimage
}
paymentHash := sha256.Sum256(payment.PaymentPreimage[:]) msatValue := int64(payment.Info.Value)
satValue := int64(payment.Info.Value.ToSatoshis())
paymentHash := payment.Info.PaymentHash
paymentsResp.Payments[i] = &lnrpc.Payment{ paymentsResp.Payments[i] = &lnrpc.Payment{
PaymentHash: hex.EncodeToString(paymentHash[:]), PaymentHash: hex.EncodeToString(paymentHash[:]),
Value: satValue, Value: satValue,
ValueMsat: msatValue, ValueMsat: msatValue,
ValueSat: satValue, ValueSat: satValue,
CreationDate: payment.CreationDate.Unix(), CreationDate: payment.Info.CreationDate.Unix(),
Path: path, Path: path,
Fee: int64(payment.Fee.ToSatoshis()), Fee: int64(route.TotalFees().ToSatoshis()),
PaymentPreimage: hex.EncodeToString(payment.PaymentPreimage[:]), PaymentPreimage: hex.EncodeToString(preimage[:]),
} }
} }
@ -4185,7 +4158,7 @@ func (r *rpcServer) DeleteAllPayments(ctx context.Context,
rpcsLog.Debugf("[DeleteAllPayments]") rpcsLog.Debugf("[DeleteAllPayments]")
if err := r.server.chanDB.DeleteAllPayments(); err != nil { if err := r.server.chanDB.DeletePayments(); err != nil {
return nil, err return nil, err
} }

@ -621,6 +621,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
Chain: cc.chainIO, Chain: cc.chainIO,
ChainView: cc.chainView, ChainView: cc.chainView,
Payer: s.htlcSwitch, Payer: s.htlcSwitch,
Control: channeldb.NewPaymentControl(chanDB),
ChannelPruneExpiry: routing.DefaultChannelPruneExpiry, ChannelPruneExpiry: routing.DefaultChannelPruneExpiry,
GraphPruneInterval: time.Duration(time.Hour), GraphPruneInterval: time.Duration(time.Hour),
QueryBandwidth: func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { QueryBandwidth: func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {