From 7296cfb425ce6aa14320bb9291be4b370a2e56a3 Mon Sep 17 00:00:00 2001 From: Vadym Popov Date: Sun, 12 Aug 2018 16:12:37 +0300 Subject: [PATCH 01/32] channeldb: add payment statuses: ground, in flight, completed --- channeldb/payments.go | 96 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/channeldb/payments.go b/channeldb/payments.go index 0e5f47b9..aa880b0b 100644 --- a/channeldb/payments.go +++ b/channeldb/payments.go @@ -3,6 +3,7 @@ package channeldb import ( "bytes" "encoding/binary" + "errors" "io" "github.com/coreos/bbolt" @@ -17,8 +18,64 @@ var ( // which is a monotonically increasing uint64. BoltDB's sequence // feature is used for generating monotonically increasing id. paymentBucket = []byte("payments") + + // paymentStatusBucket is the name of the bucket within the database that + // stores the status of a payment indexed by the payment's preimage. + paymentStatusBucket = []byte("payment-status") ) +// PaymentStatus represent current status of payment +type PaymentStatus byte + +const ( + // StatusGrounded is status where payment is initiated and received + // an intermittent failure + StatusGrounded PaymentStatus = 0 + + // StatusInFlight is status where payment is initiated, but a response + // has not been received + StatusInFlight PaymentStatus = 1 + + // StatusCompleted is status where payment is initiated and complete + // a payment successfully + StatusCompleted PaymentStatus = 2 +) + +// Bytes returns status as slice of bytes +func (ps PaymentStatus) Bytes() []byte { + return []byte{byte(ps)} +} + +// FromBytes sets status from slice of bytes +func (ps *PaymentStatus) FromBytes(status []byte) error { + if len(status) != 1 { + return errors.New("payment status is empty") + } + + switch PaymentStatus(status[0]) { + case StatusGrounded, StatusInFlight, StatusCompleted: + *ps = PaymentStatus(status[0]) + default: + return errors.New("unknown payment status") + } + + return nil +} + +// String returns readable representation of payment status +func (ps PaymentStatus) String() string { + switch ps { + case StatusGrounded: + return "Grounded" + case StatusInFlight: + return "In Flight" + case StatusCompleted: + return "Completed" + default: + return "Unknown" + } +} + // OutgoingPayment represents a successful payment between the daemon and a // remote node. Details such as the total fee paid, and the time of the payment // are stored. @@ -129,6 +186,45 @@ func (db *DB) DeleteAllPayments() error { }) } +// UpdatePaymentStatus sets status for outgoing/finished payment to store status in +// local database. +func (db *DB) UpdatePaymentStatus(paymentHash [32]byte, status PaymentStatus) error { + return db.Batch(func(tx *bolt.Tx) error { + paymentStatuses, err := tx.CreateBucketIfNotExists(paymentStatusBucket) + if err != nil { + return err + } + + return paymentStatuses.Put(paymentHash[:], status.Bytes()) + }) +} + +// FetchPaymentStatus returns payment status for outgoing payment +// if status of the payment isn't found it set to default status "StatusGrounded". +func (db *DB) FetchPaymentStatus(paymentHash [32]byte) (PaymentStatus, error) { + // default status for all payments that wasn't recorded in database + paymentStatus := StatusGrounded + + err := db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(paymentStatusBucket) + if bucket == nil { + return nil + } + + paymentStatusBytes := bucket.Get(paymentHash[:]) + if paymentStatusBytes == nil { + return nil + } + + return paymentStatus.FromBytes(paymentStatusBytes) + }) + if err != nil { + return StatusGrounded, err + } + + return paymentStatus, nil +} + func serializeOutgoingPayment(w io.Writer, p *OutgoingPayment) error { var scratch [8]byte From 7f6fbd4533fb65b1e0380b4584647503f654a7e2 Mon Sep 17 00:00:00 2001 From: Vadym Popov Date: Sun, 12 Aug 2018 16:13:59 +0300 Subject: [PATCH 02/32] channeldb: test of payment statuses transitions --- channeldb/payments_test.go | 56 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/channeldb/payments_test.go b/channeldb/payments_test.go index 450b4acf..d13e039d 100644 --- a/channeldb/payments_test.go +++ b/channeldb/payments_test.go @@ -40,6 +40,14 @@ func makeFakePayment() *OutgoingPayment { return fakePayment } +func makeFakePaymentHash() [32]byte { + var paymentHash [32]byte + rBytes, _ := randomBytes(0, 32) + copy(paymentHash[:], rBytes) + + return paymentHash +} + // randomBytes creates random []byte with length in range [minLen, maxLen) func randomBytes(minLen, maxLen int) ([]byte, error) { randBuf := make([]byte, minLen+rand.Intn(maxLen-minLen)) @@ -195,3 +203,51 @@ func TestOutgoingPaymentWorkflow(t *testing.T) { len(paymentsAfterDeletion), 0) } } + +func TestPaymentStatusWorkflow(t *testing.T) { + t.Parallel() + + db, cleanUp, err := makeTestDB() + defer cleanUp() + if err != nil { + t.Fatalf("unable to make test db: %v", err) + } + + testCases := []struct { + paymentHash [32]byte + status PaymentStatus + }{ + { + paymentHash: makeFakePaymentHash(), + status: StatusGrounded, + }, + { + paymentHash: makeFakePaymentHash(), + status: StatusInFlight, + }, + { + paymentHash: makeFakePaymentHash(), + status: StatusCompleted, + }, + } + + for _, testCase := range testCases { + err := db.UpdatePaymentStatus(testCase.paymentHash, testCase.status) + if err != nil { + t.Fatalf("unable to put payment in DB: %v", err) + } + + status, err := db.FetchPaymentStatus(testCase.paymentHash) + if err != nil { + t.Fatalf("unable to fetch payments from DB: %v", err) + } + + if status != testCase.status { + t.Fatalf("Wrong payments status after reading from DB."+ + "Got %v, want %v", + spew.Sdump(status), + spew.Sdump(testCase.status), + ) + } + } +} From f405376b8b8829e4a8241e7e4d0638cc3490b95b Mon Sep 17 00:00:00 2001 From: Vadym Popov Date: Sun, 12 Aug 2018 16:15:24 +0300 Subject: [PATCH 03/32] channeldb: add migration for payment statuses for completed payments --- channeldb/migrations.go | 44 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/channeldb/migrations.go b/channeldb/migrations.go index e8b658ed..60e58fb6 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -2,6 +2,7 @@ package channeldb import ( "bytes" + "crypto/sha256" "fmt" "github.com/coreos/bbolt" @@ -373,3 +374,46 @@ func migrateEdgePolicies(tx *bolt.Tx) error { return nil } + +// paymentStatusesMigration is a database migration intended for adding payment +// statuses for each existing payment entity in bucket to be able control +// transitions of statuses and prevent cases such as double payment +func paymentStatusesMigration(tx *bolt.Tx) error { + // Get the bucket dedicated to storing payments + bucket := tx.Bucket(paymentBucket) + if bucket == nil { + return nil + } + + // Get the bucket dedicated to storing statuses of payments, + // where a key is payment hash, value is payment status + paymentStatuses, err := tx.CreateBucketIfNotExists(paymentStatusBucket) + if err != nil { + return err + } + + log.Infof("Migration database adds to all existing payments " + + "statuses as Completed") + + // For each payment in the bucket, fetch all data. + return bucket.ForEach(func(k, v []byte) error { + // Ignores if it is sub-bucket. + if v == nil { + return nil + } + + r := bytes.NewReader(v) + payment, err := deserializeOutgoingPayment(r) + if err != nil { + return err + } + + // Calculate payment hash for current payment. + paymentHash := sha256.Sum256(payment.PaymentPreimage[:]) + + // Tries to update status for current payment to completed + // if it fails - migration abort transaction and return payment bucket + // to previous state. + return paymentStatuses.Put(paymentHash[:], StatusCompleted.Bytes()) + }) +} \ No newline at end of file From 24e3310f1330c3104bf9fe9b89d744512895afb9 Mon Sep 17 00:00:00 2001 From: Vadym Popov Date: Sun, 12 Aug 2018 16:16:21 +0300 Subject: [PATCH 04/32] channeldb: payment statuses migration test --- channeldb/migrations_test.go | 71 ++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 channeldb/migrations_test.go diff --git a/channeldb/migrations_test.go b/channeldb/migrations_test.go new file mode 100644 index 00000000..6adf76c1 --- /dev/null +++ b/channeldb/migrations_test.go @@ -0,0 +1,71 @@ +package channeldb + +import ( + "crypto/sha256" + "testing" +) + +func TestPaymentStatusesMigration(t *testing.T) { + t.Parallel() + + fakePayment := makeFakePayment() + paymentHash := sha256.Sum256(fakePayment.PaymentPreimage[:]) + + // Add fake payment to the test database and verifies that it was created + // and there is only one payment and its status is not "Completed". + beforeMigrationFunc := func(d *DB) { + if err := d.AddPayment(fakePayment); err != nil { + t.Fatalf("unable to add payment: %v", err) + } + + payments, err := d.FetchAllPayments() + if err != nil { + t.Fatalf("unable to fetch payments: %v", err) + } + + if len(payments) != 1 { + t.Fatalf("wrong qty of paymets: expected 1, got %v", + len(payments)) + } + + paymentStatus, err := d.FetchPaymentStatus(paymentHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + // We should receive default status if we have any in database. + if paymentStatus != StatusGrounded { + t.Fatalf("wrong payment status: expected %v, got %v", + StatusGrounded.String(), paymentStatus.String()) + } + } + + // Verify that was created payment status "Completed" for our one fake + // payment. + afterMigrationFunc := func(d *DB) { + meta, err := d.FetchMeta(nil) + if err != nil { + t.Fatal(err) + } + + if meta.DbVersionNumber != 1 { + t.Fatal("migration 'paymentStatusesMigration' wasn't applied") + } + + paymentStatus, err := d.FetchPaymentStatus(paymentHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if paymentStatus != StatusCompleted { + t.Fatalf("wrong payment status: expected %v, got %v", + StatusCompleted.String(), paymentStatus.String()) + } + } + + applyMigration(t, + beforeMigrationFunc, + afterMigrationFunc, + paymentStatusesMigration, + false) +} From 41e31e0909aa3e49706f492056956e481f61770b Mon Sep 17 00:00:00 2001 From: Vadym Popov Date: Sun, 12 Aug 2018 16:17:10 +0300 Subject: [PATCH 05/32] channeldb: applying payment statuses migration to current database --- channeldb/db.go | 6 +++++ channeldb/db_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++ channeldb/meta_test.go | 53 ---------------------------------------- 3 files changed, 61 insertions(+), 53 deletions(-) diff --git a/channeldb/db.go b/channeldb/db.go index 0035e983..c90d4885 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -67,6 +67,12 @@ var ( number: 4, migration: migrateEdgePolicies, }, + { + // The version with added payment statuses + // for each existing payment + number: 5, + migration: paymentStatusesMigration, + }, } // Big endian is the preferred byte order, due to cursor scans over diff --git a/channeldb/db_test.go b/channeldb/db_test.go index f3e3c96e..7edae3c5 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -5,6 +5,8 @@ import ( "os" "path/filepath" "testing" + + "github.com/go-errors/errors" ) func TestOpenWithCreate(t *testing.T) { @@ -33,3 +35,56 @@ func TestOpenWithCreate(t *testing.T) { t.Fatalf("channeldb failed to create data directory") } } + +// applyMigration is a helper test function that encapsulates the general steps +// which are needed to properly check the result of applying migration function. +func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB), + migrationFunc migration, shouldFail bool) { + + cdb, cleanUp, err := makeTestDB() + defer cleanUp() + if err != nil { + t.Fatal(err) + } + + // beforeMigration usually used for populating the database + // with test data. + beforeMigration(cdb) + + // Create test meta info with zero database version and put it on disk. + // Than creating the version list pretending that new version was added. + meta := &Meta{DbVersionNumber: 0} + if err := cdb.PutMeta(meta); err != nil { + t.Fatalf("unable to store meta data: %v", err) + } + + versions := []version{ + { + number: 0, + migration: nil, + }, + { + number: 1, + migration: migrationFunc, + }, + } + + defer func() { + if r := recover(); r != nil { + err = errors.New(r) + } + + if err == nil && shouldFail { + t.Fatal("error wasn't received on migration stage") + } else if err != nil && !shouldFail { + t.Fatal("error was received on migration stage") + } + + // afterMigration usually used for checking the database state and + // throwing the error if something went wrong. + afterMigration(cdb) + }() + + // Sync with the latest version - applying migration function. + err = cdb.syncVersions(versions) +} diff --git a/channeldb/meta_test.go b/channeldb/meta_test.go index dbed0a2b..28ced7d8 100644 --- a/channeldb/meta_test.go +++ b/channeldb/meta_test.go @@ -118,59 +118,6 @@ func TestGlobalVersionList(t *testing.T) { } } -// applyMigration is a helper test function that encapsulates the general steps -// which are needed to properly check the result of applying migration function. -func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB), - migrationFunc migration, shouldFail bool) { - - cdb, cleanUp, err := makeTestDB() - defer cleanUp() - if err != nil { - t.Fatal(err) - } - - // beforeMigration usually used for populating the database - // with test data. - beforeMigration(cdb) - - // Create test meta info with zero database version and put it on disk. - // Than creating the version list pretending that new version was added. - meta := &Meta{DbVersionNumber: 0} - if err := cdb.PutMeta(meta); err != nil { - t.Fatalf("unable to store meta data: %v", err) - } - - versions := []version{ - { - number: 0, - migration: nil, - }, - { - number: 1, - migration: migrationFunc, - }, - } - - defer func() { - if r := recover(); r != nil { - err = errors.New(r) - } - - if err == nil && shouldFail { - t.Fatal("error wasn't received on migration stage") - } else if err != nil && !shouldFail { - t.Fatal("error was received on migration stage") - } - - // afterMigration usually used for checking the database state and - // throwing the error if something went wrong. - afterMigration(cdb) - }() - - // Sync with the latest version - applying migration function. - err = cdb.syncVersions(versions) -} - func TestMigrationWithPanic(t *testing.T) { t.Parallel() From a3d32be808c6eb0248dfd1e272c1907132fa0541 Mon Sep 17 00:00:00 2001 From: Vadym Popov Date: Sun, 12 Aug 2018 16:18:35 +0300 Subject: [PATCH 06/32] htlcswitch: control tower implementation --- htlcswitch/switch_control.go | 142 +++++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 htlcswitch/switch_control.go diff --git a/htlcswitch/switch_control.go b/htlcswitch/switch_control.go new file mode 100644 index 00000000..d62b8872 --- /dev/null +++ b/htlcswitch/switch_control.go @@ -0,0 +1,142 @@ +package htlcswitch + +import ( + "errors" + "sync" + + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" +) + +var ( + // ErrAlreadyPaid is used when we have already paid + ErrAlreadyPaid = errors.New("invoice was already paid") + + // ErrPaymentInFlight returns in case if payment is already "in flight" + ErrPaymentInFlight = errors.New("payment is in transition") + + // ErrPaymentNotInitiated returns in case if payment wasn't initiated + // in switch + ErrPaymentNotInitiated = errors.New("payment isn't initiated") + + // ErrPaymentAlreadyCompleted returns in case of attempt to complete + // completed payment + ErrPaymentAlreadyCompleted = errors.New("payment is already completed") +) + +// ControlTower is a controller interface of sending HTLC messages to switch +type ControlTower interface { + // CheckSend intercepts incoming message to provide checks + // and fail if specific message is not allowed by implementation + CheckSend(htlc *lnwire.UpdateAddHTLC) error + + // Success marks message transition as successful + Success(paymentHash [32]byte) error + + // Fail marks message transition as failed + Fail(paymentHash [32]byte) error +} + +// paymentControl is implementation of ControlTower to restrict double payment +// sending. +type paymentControl struct { + mx sync.Mutex + + db *channeldb.DB +} + +// NewPaymentControl creates a new instance of the paymentControl. +func NewPaymentControl(db *channeldb.DB) ControlTower { + return &paymentControl{ + db: db, + } +} + +// CheckSend checks that a sending htlc wasn't triggered before for specific +// payment hash, if so, should trigger error depends on current status +func (p *paymentControl) CheckSend(htlc *lnwire.UpdateAddHTLC) error { + p.mx.Lock() + defer p.mx.Unlock() + + // Retrieve current status of payment from local database. + paymentStatus, err := p.db.FetchPaymentStatus(htlc.PaymentHash) + if err != nil { + return err + } + + switch paymentStatus { + case channeldb.StatusGrounded: + // It is safe to reattempt a payment if we know that we haven't + // left one in flight prior to restarting and switch. + return p.db.UpdatePaymentStatus(htlc.PaymentHash, + channeldb.StatusInFlight) + + case channeldb.StatusInFlight: + // Not clear if it's safe to reinitiate a payment if there + // is already a payment in flight, so we should withhold any + // additional attempts to send to that payment hash. + return ErrPaymentInFlight + + case channeldb.StatusCompleted: + // It has been already paid and don't want to pay again. + return ErrAlreadyPaid + } + + return nil +} + +// Success proceed status changing of payment to next successful status +func (p *paymentControl) Success(paymentHash [32]byte) error { + p.mx.Lock() + defer p.mx.Unlock() + + paymentStatus, err := p.db.FetchPaymentStatus(paymentHash) + if err != nil { + return err + } + + switch paymentStatus { + case channeldb.StatusGrounded: + // Payment isn't initiated but received. + return ErrPaymentNotInitiated + + case channeldb.StatusInFlight: + // Successful transition from InFlight transition to Completed. + return p.db.UpdatePaymentStatus(paymentHash, channeldb.StatusCompleted) + + case channeldb.StatusCompleted: + // Payment is completed before in should be ignored. + return ErrPaymentAlreadyCompleted + } + + return nil +} + +// Fail proceed status changing of payment to initial status in case of failure +func (p *paymentControl) Fail(paymentHash [32]byte) error { + p.mx.Lock() + defer p.mx.Unlock() + + paymentStatus, err := p.db.FetchPaymentStatus(paymentHash) + if err != nil { + return err + } + + switch paymentStatus { + case channeldb.StatusGrounded: + // Unpredictable behavior when payment wasn't transited to + // StatusInFlight status and was failed. + return ErrPaymentNotInitiated + + case channeldb.StatusInFlight: + // If payment wasn't processed by some reason should return to + // default status to unlock retrying option for the same payment hash. + return p.db.UpdatePaymentStatus(paymentHash, channeldb.StatusGrounded) + + case channeldb.StatusCompleted: + // Payment is completed before and can't be moved to another status. + return ErrPaymentAlreadyCompleted + } + + return nil +} From 350287779befb476a2d43ef6706dac961320dd4e Mon Sep 17 00:00:00 2001 From: Vadym Popov Date: Sun, 12 Aug 2018 16:19:12 +0300 Subject: [PATCH 07/32] htlcswitch: control tower test of payment statuses transitions --- htlcswitch/switch_control_test.go | 206 ++++++++++++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 htlcswitch/switch_control_test.go diff --git a/htlcswitch/switch_control_test.go b/htlcswitch/switch_control_test.go new file mode 100644 index 00000000..ec109718 --- /dev/null +++ b/htlcswitch/switch_control_test.go @@ -0,0 +1,206 @@ +package htlcswitch + +import ( + "fmt" + "testing" + + "github.com/btcsuite/fastsha256" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" +) + +func genHtlc() (*lnwire.UpdateAddHTLC, error) { + preimage, err := genPreimage() + if err != nil { + return nil, fmt.Errorf("unable to generate preimage: %v", err) + } + + rhash := fastsha256.Sum256(preimage[:]) + htlc := &lnwire.UpdateAddHTLC{ + PaymentHash: rhash, + Amount: 1, + } + + return htlc, nil +} + +// TestPaymentControlSwitch checks the ability of payment control +// change states of payments +func TestPaymentControlSwitch(t *testing.T) { + t.Parallel() + + db, err := initDB() + if err != nil { + t.Fatalf("unable to init db: %v", err) + } + + pControl := NewPaymentControl(db) + + htlc, err := genHtlc() + if err != nil { + t.Fatalf("unable to generate htlc message: %v", err) + } + + // Sends base htlc message which initiate base status + // and move it to StatusInFlight and verifies that it + // was changed. + if err := pControl.CheckSend(htlc); err != nil { + t.Fatalf("unable to send htlc message: %v", err) + } + + pStatus, err := db.FetchPaymentStatus(htlc.PaymentHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if pStatus != channeldb.StatusInFlight { + t.Fatalf("payment status mismatch: expected %v, got %v", + channeldb.StatusInFlight, pStatus) + } + + // Verifies that status was changed to StatusCompleted. + if err := pControl.Success(htlc.PaymentHash); err != nil { + t.Fatalf("error shouldn't have been received, got: %v", err) + } + + pStatus, err = db.FetchPaymentStatus(htlc.PaymentHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if pStatus != channeldb.StatusCompleted { + t.Fatalf("payment status mismatch: expected %v, got %v", + channeldb.StatusCompleted, pStatus) + } +} + +// TestPaymentControlSwitchFail checks that payment status returns +// to initial status after fail +func TestPaymentControlSwitchFail(t *testing.T) { + t.Parallel() + + db, err := initDB() + if err != nil { + t.Fatalf("unable to init db: %v", err) + } + + pControl := NewPaymentControl(db) + + htlc, err := genHtlc() + if err != nil { + t.Fatalf("unable to generate htlc message: %v", err) + } + + // Sends base htlc message which initiate StatusInFlight. + if err := pControl.CheckSend(htlc); err != nil { + t.Fatalf("unable to send htlc message: %v", err) + } + + pStatus, err := db.FetchPaymentStatus(htlc.PaymentHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if pStatus != channeldb.StatusInFlight { + t.Fatalf("payment status mismatch: expected %v, got %v", + channeldb.StatusInFlight, pStatus) + } + + // Move payment to completed status, second payment should return error. + pControl.Fail(htlc.PaymentHash) + + pStatus, err = db.FetchPaymentStatus(htlc.PaymentHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if pStatus != channeldb.StatusGrounded { + t.Fatalf("payment status mismatch: expected %v, got %v", + channeldb.StatusGrounded, pStatus) + } +} + +// TestPaymentControlSwitchDoubleSend checks the ability of payment control +// to prevent double sending of htlc message, when message is in StatusInFlight +func TestPaymentControlSwitchDoubleSend(t *testing.T) { + t.Parallel() + + db, err := initDB() + if err != nil { + t.Fatalf("unable to init db: %v", err) + } + + pControl := NewPaymentControl(db) + + htlc, err := genHtlc() + if err != nil { + t.Fatalf("unable to generate htlc message: %v", err) + } + + // Sends base htlc message which initiate base status + // and move it to StatusInFlight and verifies that it + // was changed. + if err := pControl.CheckSend(htlc); err != nil { + t.Fatalf("unable to send htlc message: %v", err) + } + + pStatus, err := db.FetchPaymentStatus(htlc.PaymentHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if pStatus != channeldb.StatusInFlight { + t.Fatalf("payment status mismatch: expected %v, got %v", + channeldb.StatusInFlight, pStatus) + } + + // Tries to initiate double sending of htlc message with the same + // payment hash. + if err := pControl.CheckSend(htlc); err != ErrPaymentInFlight { + t.Fatalf("payment control wrong behaviour: " + + "double sending must trigger ErrPaymentInFlight error") + } +} + +// TestPaymentControlSwitchDoublePay checks the ability of payment control +// to prevent double payment +func TestPaymentControlSwitchDoublePay(t *testing.T) { + t.Parallel() + + db, err := initDB() + if err != nil { + t.Fatalf("unable to init db: %v", err) + } + + pControl := NewPaymentControl(db) + + htlc, err := genHtlc() + if err != nil { + t.Fatalf("unable to generate htlc message: %v", err) + } + + // Sends base htlc message which initiate StatusInFlight. + if err := pControl.CheckSend(htlc); err != nil { + t.Fatalf("unable to send htlc message: %v", err) + } + + pStatus, err := db.FetchPaymentStatus(htlc.PaymentHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if pStatus != channeldb.StatusInFlight { + t.Fatalf("payment status mismatch: expected %v, got %v", + channeldb.StatusInFlight, pStatus) + } + + // Move payment to completed status, second payment should return error. + if err := pControl.Success(htlc.PaymentHash); err != nil { + t.Fatalf("error shouldn't have been received, got: %v", err) + } + + if err := pControl.CheckSend(htlc); err != ErrAlreadyPaid { + t.Fatalf("payment control wrong behaviour:" + + " double payment must trigger ErrAlreadyPaid") + } +} From 21fc7aa8294f926506f6b4dafe32eddd4b0cd1b4 Mon Sep 17 00:00:00 2001 From: Vadym Popov Date: Sun, 12 Aug 2018 16:19:54 +0300 Subject: [PATCH 08/32] htlcswitch: integration of control tower to htlc switch --- htlcswitch/switch.go | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 8f94a9a0..a13172bc 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -9,12 +9,11 @@ import ( "sync/atomic" "time" + "github.com/boltdb/bolt" "github.com/btcsuite/btcd/btcec" - "github.com/coreos/bbolt" - "github.com/davecgh/go-spew/spew" - "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" @@ -204,6 +203,9 @@ type Switch struct { paymentSequencer Sequencer + // control provides verification of sending htlc mesages + control ControlTower + // circuits is storage for payment circuits which are used to // forward the settle/fail htlc updates back to the add htlc initiator. circuits CircuitMap @@ -289,6 +291,7 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) { cfg: &cfg, circuits: circuitMap, paymentSequencer: sequencer, + control: NewPaymentControl(cfg.DB), linkIndex: make(map[lnwire.ChannelID]ChannelLink), mailOrchestrator: newMailOrchestrator(), forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink), @@ -344,6 +347,11 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, htlc *lnwire.UpdateAddHTLC, deobfuscator ErrorDecrypter) ([sha256.Size]byte, error) { + // Verify message by ControlTower implementation. + if err := s.control.CheckSend(htlc); err != nil { + return zeroPreimage, err + } + // Create payment and add to the map of payment in order later to be // able to retrieve it and return response to the user. payment := &pendingPayment{ @@ -376,6 +384,10 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, if err := s.forward(packet); err != nil { s.removePendingPayment(paymentID) + if err := s.control.Fail(htlc.PaymentHash); err != nil { + return zeroPreimage, err + } + return zeroPreimage, err } @@ -837,6 +849,10 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { payment.preimage <- htlc.PaymentPreimage s.removePendingPayment(pkt.incomingHTLCID) + if err := s.control.Success(pkt.circuit.PaymentHash); err != nil { + return err + } + // We've just received a fail update which means we can finalize the // user payment and return fail response. case *lnwire.UpdateFailHTLC: @@ -901,6 +917,10 @@ func (s *Switch) parseFailedPayment(payment *pendingPayment, pkt *htlcPacket, FailureMessage: lnwire.FailPermanentChannelFailure{}, } + if err := s.control.Fail(pkt.circuit.PaymentHash); err != nil { + log.Error(err) + } + // A regular multi-hop payment error that we'll need to // decrypt. default: @@ -917,6 +937,10 @@ func (s *Switch) parseFailedPayment(payment *pendingPayment, pkt *htlcPacket, ExtraMsg: userErr, FailureMessage: lnwire.NewTemporaryChannelFailure(nil), } + + if err := s.control.Fail(pkt.circuit.PaymentHash); err != nil { + log.Error(err) + } } } From 033fd3c83d1135070b231ea618ff2076a51c9d7e Mon Sep 17 00:00:00 2001 From: Vadym Popov Date: Sun, 12 Aug 2018 16:20:57 +0300 Subject: [PATCH 09/32] htlcswitch: add test for integrated control tower --- htlcswitch/link_test.go | 4 ++-- htlcswitch/mock.go | 26 +++++++++++++++++++------- htlcswitch/switch_test.go | 31 +++++-------------------------- 3 files changed, 26 insertions(+), 35 deletions(-) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index a5f417be..b48587e6 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -3755,8 +3755,8 @@ func TestChannelLinkAcceptDuplicatePayment(t *testing.T) { n.firstBobChannelLink.ShortChanID(), htlc, newMockDeobfuscator(), ) - if err != nil { - t.Fatalf("error shouldn't have been received got: %v", err) + if err != ErrAlreadyPaid { + t.Fatalf("ErrAlreadyPaid should have been received got: %v", err) } } diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 9df91993..cd563ce7 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -124,14 +124,26 @@ type mockServer struct { var _ lnpeer.Peer = (*mockServer)(nil) -func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error) { - if db == nil { - tempPath, err := ioutil.TempDir("", "switchdb") - if err != nil { - return nil, err - } +func initDB() (*channeldb.DB, error) { + tempPath, err := ioutil.TempDir("", "switchdb") + if err != nil { + return nil, err + } - db, err = channeldb.Open(tempPath) + db, err := channeldb.Open(tempPath) + if err != nil { + return nil, err + } + + return db, err +} + + +func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error) { + var err error + + if db == nil { + db, err = initDB() if err != nil { return nil, err } diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index 7610a797..6dd8980f 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -1678,7 +1678,9 @@ func TestSwitchSendPayment(t *testing.T) { } case err := <-errChan: - t.Fatalf("unable to send payment: %v", err) + if err != ErrPaymentInFlight { + t.Fatalf("unable to send payment: %v", err) + } case <-time.After(time.Second): t.Fatal("request was not propagated to destination") } @@ -1695,11 +1697,11 @@ func TestSwitchSendPayment(t *testing.T) { t.Fatal("request was not propagated to destination") } - if s.numPendingPayments() != 2 { + if s.numPendingPayments() != 1 { t.Fatal("wrong amount of pending payments") } - if s.circuits.NumOpen() != 2 { + if s.circuits.NumOpen() != 1 { t.Fatal("wrong amount of circuits") } @@ -1735,29 +1737,6 @@ func TestSwitchSendPayment(t *testing.T) { t.Fatal("err wasn't received") } - packet = &htlcPacket{ - outgoingChanID: aliceChannelLink.ShortChanID(), - outgoingHTLCID: 1, - htlc: &lnwire.UpdateFailHTLC{ - Reason: reason, - }, - } - - // Send second failure response and check that user were able to - // receive the error. - if err := s.forward(packet); err != nil { - t.Fatalf("can't forward htlc packet: %v", err) - } - - select { - case err := <-errChan: - if err.Error() != errors.New(lnwire.CodeIncorrectPaymentAmount).Error() { - t.Fatal("err wasn't received") - } - case <-time.After(time.Second): - t.Fatal("err wasn't received") - } - if s.numPendingPayments() != 0 { t.Fatal("wrong amount of pending payments") } From 76dbe670cb28247cca6f303a6cd604d5dc549536 Mon Sep 17 00:00:00 2001 From: Vadym Popov Date: Sun, 12 Aug 2018 16:23:08 +0300 Subject: [PATCH 10/32] lnd: add integration test checks unavailability to pay same preimage --- lnd_test.go | 56 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/lnd_test.go b/lnd_test.go index dce4e5f4..9d6b45e7 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -450,6 +450,17 @@ func completePaymentRequests(ctx context.Context, client lnrpc.LightningClient, return nil } +// makeFakePayHash creates random pre image hash +func makeFakePayHash(t *harnessTest) []byte { + randBuf := make([]byte, 32) + + if _, err := rand.Read(randBuf); err != nil { + t.Fatalf("internal error, cannot generate random string: %v", err) + } + + return randBuf +} + const ( AddrTypeWitnessPubkeyHash = lnrpc.NewAddressRequest_WITNESS_PUBKEY_HASH AddrTypeNestedPubkeyHash = lnrpc.NewAddressRequest_NESTED_PUBKEY_HASH @@ -1825,13 +1836,13 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err != nil { t.Fatalf("unable to create payment stream for alice: %v", err) } + carolPubKey := carol.PubKey[:] - payHash := bytes.Repeat([]byte{2}, 32) for i := 0; i < numInvoices; i++ { err = alicePayStream.Send(&lnrpc.SendRequest{ Dest: carolPubKey, Amt: int64(paymentAmt), - PaymentHash: payHash, + PaymentHash: makeFakePayHash(t), FinalCltvDelta: defaultBitcoinTimeLockDelta, }) if err != nil { @@ -3825,9 +3836,16 @@ func testPrivateChannels(net *lntest.NetworkHarness, t *harnessTest) { const paymentAmt = 70000 payReqs := make([]string, numPayments) for i := 0; i < numPayments; i++ { + preimage := make([]byte, 32) + _, err := rand.Read(preimage) + if err != nil { + t.Fatalf("unable to generate preimage: %v", err) + } + invoice := &lnrpc.Invoice{ - Memo: "testing", - Value: paymentAmt, + Memo: "testing", + RPreimage: preimage, + Value: paymentAmt, } resp, err := net.Bob.AddInvoice(ctxb, invoice) if err != nil { @@ -3888,9 +3906,16 @@ func testPrivateChannels(net *lntest.NetworkHarness, t *harnessTest) { const paymentAmt60k = 60000 payReqs = make([]string, numPayments) for i := 0; i < numPayments; i++ { + preimage := make([]byte, 32) + _, err := rand.Read(preimage) + if err != nil { + t.Fatalf("unable to generate preimage: %v", err) + } + invoice := &lnrpc.Invoice{ - Memo: "testing", - Value: paymentAmt60k, + Memo: "testing", + RPreimage: preimage, + Value: paymentAmt60k, } resp, err := carol.AddInvoice(ctxb, invoice) if err != nil { @@ -4376,10 +4401,9 @@ func testInvoiceSubscriptions(net *lntest.NetworkHarness, t *harnessTest) { // TODO(roasbeef): make global list of invoices for each node to re-use // and avoid collisions const paymentAmt = 1000 - preimage := bytes.Repeat([]byte{byte(90)}, 32) invoice := &lnrpc.Invoice{ Memo: "testing", - RPreimage: preimage, + RPreimage: makeFakePayHash(t), Value: paymentAmt, } invoiceResp, err := net.Bob.AddInvoice(ctxb, invoice) @@ -6607,7 +6631,7 @@ out: // stream on payment error. ctxt, _ = context.WithTimeout(ctxb, timeout) sendReq := &lnrpc.SendRequest{ - PaymentHashString: hex.EncodeToString(bytes.Repeat([]byte("Z"), 32)), + PaymentHashString: hex.EncodeToString(makeFakePayHash(t)), DestString: hex.EncodeToString(carol.PubKey[:]), Amt: payAmt, } @@ -6736,6 +6760,12 @@ out: "instead: %v", resp.PaymentError) } + // Generate new invoice to not pay same invoice twice. + carolInvoice, err = carol.AddInvoice(ctxb, invoiceReq) + if err != nil { + t.Fatalf("unable to generate carol invoice: %v", err) + } + // For our final test, we'll ensure that if a target link isn't // available for what ever reason then the payment fails accordingly. // @@ -7833,8 +7863,8 @@ func testMultiHopHtlcLocalTimeout(net *lntest.NetworkHarness, t *harnessTest) { // We'll create two random payment hashes unknown to carol, then send // each of them by manually specifying the HTLC details. carolPubKey := carol.PubKey[:] - dustPayHash := bytes.Repeat([]byte{1}, 32) - payHash := bytes.Repeat([]byte{2}, 32) + dustPayHash := makeFakePayHash(t) + payHash := makeFakePayHash(t) err = alicePayStream.Send(&lnrpc.SendRequest{ Dest: carolPubKey, Amt: int64(dustHtlcAmt), @@ -8292,7 +8322,7 @@ func testMultiHopLocalForceCloseOnChainHtlcTimeout(net *lntest.NetworkHarness, // We'll now send a single HTLC across our multi-hop network. carolPubKey := carol.PubKey[:] - payHash := bytes.Repeat([]byte{2}, 32) + payHash := makeFakePayHash(t) err = alicePayStream.Send(&lnrpc.SendRequest{ Dest: carolPubKey, Amt: int64(htlcAmt), @@ -8549,7 +8579,7 @@ func testMultiHopRemoteForceCloseOnChainHtlcTimeout(net *lntest.NetworkHarness, // We'll now send a single HTLC across our multi-hop network. carolPubKey := carol.PubKey[:] - payHash := bytes.Repeat([]byte{2}, 32) + payHash := makeFakePayHash(t) err = alicePayStream.Send(&lnrpc.SendRequest{ Dest: carolPubKey, Amt: int64(htlcAmt), From 79a42033467b201d139da67d84ba8740e767d1ba Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 10 Aug 2018 13:59:20 -0700 Subject: [PATCH 11/32] channeldb/db: add comment describing migration --- channeldb/db.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/channeldb/db.go b/channeldb/db.go index c90d4885..939bbdcb 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -68,8 +68,9 @@ var ( migration: migrateEdgePolicies, }, { - // The version with added payment statuses - // for each existing payment + // The DB version where we persist each attempt to send + // an HTLC to a payment hash, and track whether the + // payment is in-flight, succeeded, or failed. number: 5, migration: paymentStatusesMigration, }, From 090e97cd3b330ab31287af8e3fdeb6d77a8ffefe Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 10 Aug 2018 14:29:58 -0700 Subject: [PATCH 12/32] channeldb/db_test: move migration helper back to meta_test --- channeldb/db_test.go | 55 -------------------------------------------- 1 file changed, 55 deletions(-) diff --git a/channeldb/db_test.go b/channeldb/db_test.go index 7edae3c5..f3e3c96e 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -5,8 +5,6 @@ import ( "os" "path/filepath" "testing" - - "github.com/go-errors/errors" ) func TestOpenWithCreate(t *testing.T) { @@ -35,56 +33,3 @@ func TestOpenWithCreate(t *testing.T) { t.Fatalf("channeldb failed to create data directory") } } - -// applyMigration is a helper test function that encapsulates the general steps -// which are needed to properly check the result of applying migration function. -func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB), - migrationFunc migration, shouldFail bool) { - - cdb, cleanUp, err := makeTestDB() - defer cleanUp() - if err != nil { - t.Fatal(err) - } - - // beforeMigration usually used for populating the database - // with test data. - beforeMigration(cdb) - - // Create test meta info with zero database version and put it on disk. - // Than creating the version list pretending that new version was added. - meta := &Meta{DbVersionNumber: 0} - if err := cdb.PutMeta(meta); err != nil { - t.Fatalf("unable to store meta data: %v", err) - } - - versions := []version{ - { - number: 0, - migration: nil, - }, - { - number: 1, - migration: migrationFunc, - }, - } - - defer func() { - if r := recover(); r != nil { - err = errors.New(r) - } - - if err == nil && shouldFail { - t.Fatal("error wasn't received on migration stage") - } else if err != nil && !shouldFail { - t.Fatal("error was received on migration stage") - } - - // afterMigration usually used for checking the database state and - // throwing the error if something went wrong. - afterMigration(cdb) - }() - - // Sync with the latest version - applying migration function. - err = cdb.syncVersions(versions) -} From 9f467275072db18f32076fc910c2c83cba705b55 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 10 Aug 2018 14:30:44 -0700 Subject: [PATCH 13/32] channeldb/meta_test: restore migration helper + godocs --- channeldb/meta_test.go | 59 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/channeldb/meta_test.go b/channeldb/meta_test.go index 28ced7d8..c98999b4 100644 --- a/channeldb/meta_test.go +++ b/channeldb/meta_test.go @@ -9,6 +9,59 @@ import ( "github.com/go-errors/errors" ) +// applyMigration is a helper test function that encapsulates the general steps +// which are needed to properly check the result of applying migration function. +func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB), + migrationFunc migration, shouldFail bool) { + + cdb, cleanUp, err := makeTestDB() + defer cleanUp() + if err != nil { + t.Fatal(err) + } + + // beforeMigration usually used for populating the database + // with test data. + beforeMigration(cdb) + + // Create test meta info with zero database version and put it on disk. + // Than creating the version list pretending that new version was added. + meta := &Meta{DbVersionNumber: 0} + if err := cdb.PutMeta(meta); err != nil { + t.Fatalf("unable to store meta data: %v", err) + } + + versions := []version{ + { + number: 0, + migration: nil, + }, + { + number: 1, + migration: migrationFunc, + }, + } + + defer func() { + if r := recover(); r != nil { + err = errors.New(r) + } + + if err == nil && shouldFail { + t.Fatal("error wasn't received on migration stage") + } else if err != nil && !shouldFail { + t.Fatal("error was received on migration stage") + } + + // afterMigration usually used for checking the database state and + // throwing the error if something went wrong. + afterMigration(cdb) + }() + + // Sync with the latest version - applying migration function. + err = cdb.syncVersions(versions) +} + // TestVersionFetchPut checks the propernces of fetch/put methods // and also initialization of meta data in case if don't have any in // database. @@ -118,6 +171,8 @@ func TestGlobalVersionList(t *testing.T) { } } +// TestMigrationWithPanic asserts that if migration logic panics, we will return +// to the original state unaltered. func TestMigrationWithPanic(t *testing.T) { t.Parallel() @@ -189,6 +244,8 @@ func TestMigrationWithPanic(t *testing.T) { true) } +// TestMigrationWithFatal asserts that migrations which fail do not modify the +// database. func TestMigrationWithFatal(t *testing.T) { t.Parallel() @@ -259,6 +316,8 @@ func TestMigrationWithFatal(t *testing.T) { true) } +// TestMigrationWithoutErrors asserts that a successful migration has its +// changes applied to the database. func TestMigrationWithoutErrors(t *testing.T) { t.Parallel() From 93e5f9a5451e1c3f7662a1ef9eed372a9cc551b2 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 10 Aug 2018 14:31:07 -0700 Subject: [PATCH 14/32] channeldb/migrations: touch up documentation --- channeldb/migrations.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/channeldb/migrations.go b/channeldb/migrations.go index 60e58fb6..5a127041 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -386,17 +386,18 @@ func paymentStatusesMigration(tx *bolt.Tx) error { } // Get the bucket dedicated to storing statuses of payments, - // where a key is payment hash, value is payment status + // where a key is payment hash, value is payment status. paymentStatuses, err := tx.CreateBucketIfNotExists(paymentStatusBucket) if err != nil { return err } - log.Infof("Migration database adds to all existing payments " + - "statuses as Completed") + log.Infof("Migrating database to support payment statuses -- " + + "marking all existing payments with status Completed") - // For each payment in the bucket, fetch all data. - return bucket.ForEach(func(k, v []byte) error { + // For each payment in the bucket, deserialize the payment and mark it + // as completed. + err = bucket.ForEach(func(k, v []byte) error { // Ignores if it is sub-bucket. if v == nil { return nil @@ -411,9 +412,16 @@ func paymentStatusesMigration(tx *bolt.Tx) error { // Calculate payment hash for current payment. paymentHash := sha256.Sum256(payment.PaymentPreimage[:]) - // Tries to update status for current payment to completed - // if it fails - migration abort transaction and return payment bucket - // to previous state. + // Update status for current payment to completed. If it fails, + // the migration is aborted and the payment bucket is returned + // to its previous state. return paymentStatuses.Put(paymentHash[:], StatusCompleted.Bytes()) }) -} \ No newline at end of file + if err != nil { + return err + } + + log.Infof("Migration of payment statuses complete!") + + return nil +} From 3a579f330504c15d80decffa509ed078e34cdb68 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 10 Aug 2018 14:31:29 -0700 Subject: [PATCH 15/32] channeldb/migrations_test: touch up docs --- channeldb/migrations_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/channeldb/migrations_test.go b/channeldb/migrations_test.go index 6adf76c1..7a1cc1ad 100644 --- a/channeldb/migrations_test.go +++ b/channeldb/migrations_test.go @@ -5,14 +5,16 @@ import ( "testing" ) +// TestPaymentStatusesMigration checks that already completed payments will have +// their payment statuses set to Completed after the migration. func TestPaymentStatusesMigration(t *testing.T) { t.Parallel() fakePayment := makeFakePayment() paymentHash := sha256.Sum256(fakePayment.PaymentPreimage[:]) - // Add fake payment to the test database and verifies that it was created - // and there is only one payment and its status is not "Completed". + // Add fake payment to test database, verifying that it was created, + // that we have only one payment, and its status is not "Completed". beforeMigrationFunc := func(d *DB) { if err := d.AddPayment(fakePayment); err != nil { t.Fatalf("unable to add payment: %v", err) @@ -40,8 +42,8 @@ func TestPaymentStatusesMigration(t *testing.T) { } } - // Verify that was created payment status "Completed" for our one fake - // payment. + // Verify that the created payment status is "Completed" for our one + // fake payment. afterMigrationFunc := func(d *DB) { meta, err := d.FetchMeta(nil) if err != nil { From e7c0f4c5dcb1a2a82762c919b9941ddbe0597d05 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 10 Aug 2018 14:31:47 -0700 Subject: [PATCH 16/32] channeldb/payments: touch up docs --- channeldb/payments.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/channeldb/payments.go b/channeldb/payments.go index aa880b0b..d8d1a856 100644 --- a/channeldb/payments.go +++ b/channeldb/payments.go @@ -28,25 +28,26 @@ var ( type PaymentStatus byte const ( - // StatusGrounded is status where payment is initiated and received - // an intermittent failure + // StatusGrounded is the status where a payment has never been + // initiated, or has been initiated and received an intermittent + // failure. StatusGrounded PaymentStatus = 0 - // StatusInFlight is status where payment is initiated, but a response - // has not been received + // StatusInFlight is the status where a payment has been initiated, but + // a response has not been received. StatusInFlight PaymentStatus = 1 - // StatusCompleted is status where payment is initiated and complete - // a payment successfully + // StatusCompleted is the status where a payment has been initiated and + // the payment was completed successfully. StatusCompleted PaymentStatus = 2 ) -// Bytes returns status as slice of bytes +// Bytes returns status as slice of bytes. func (ps PaymentStatus) Bytes() []byte { return []byte{byte(ps)} } -// FromBytes sets status from slice of bytes +// FromBytes sets status from slice of bytes. func (ps *PaymentStatus) FromBytes(status []byte) error { if len(status) != 1 { return errors.New("payment status is empty") @@ -62,7 +63,7 @@ func (ps *PaymentStatus) FromBytes(status []byte) error { return nil } -// String returns readable representation of payment status +// String returns readable representation of payment status. func (ps PaymentStatus) String() string { switch ps { case StatusGrounded: @@ -186,7 +187,7 @@ func (db *DB) DeleteAllPayments() error { }) } -// UpdatePaymentStatus sets status for outgoing/finished payment to store status in +// UpdatePaymentStatus sets the payment status for outgoing/finished payments in // local database. func (db *DB) UpdatePaymentStatus(paymentHash [32]byte, status PaymentStatus) error { return db.Batch(func(tx *bolt.Tx) error { @@ -199,10 +200,10 @@ func (db *DB) UpdatePaymentStatus(paymentHash [32]byte, status PaymentStatus) er }) } -// FetchPaymentStatus returns payment status for outgoing payment -// if status of the payment isn't found it set to default status "StatusGrounded". +// FetchPaymentStatus returns the payment status for outgoing payment. +// If status of the payment isn't found, it will default to "StatusGrounded". func (db *DB) FetchPaymentStatus(paymentHash [32]byte) (PaymentStatus, error) { - // default status for all payments that wasn't recorded in database + // The default status for all payments that aren't recorded in database. paymentStatus := StatusGrounded err := db.View(func(tx *bolt.Tx) error { From d6083e0d66ee1e2ce459eccc82fa600af437dfcb Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 10 Aug 2018 13:59:50 -0700 Subject: [PATCH 17/32] htlcswitch/switch: reorder persistent calls and app ntfn... for Settle/Fail responses. --- htlcswitch/switch.go | 150 +++++++++++++++++++++++-------------------- 1 file changed, 81 insertions(+), 69 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index a13172bc..d6e6a46a 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -9,10 +9,10 @@ import ( "sync/atomic" "time" - "github.com/boltdb/bolt" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" @@ -63,7 +63,6 @@ type pendingPayment struct { amount lnwire.MilliSatoshi preimage chan [sha256.Size]byte - response chan *htlcPacket err chan error // deobfuscator is a serializable entity which is used if we received @@ -347,8 +346,10 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, htlc *lnwire.UpdateAddHTLC, deobfuscator ErrorDecrypter) ([sha256.Size]byte, error) { - // Verify message by ControlTower implementation. - if err := s.control.CheckSend(htlc); err != nil { + // Before sending, double check that we don't already have 1) an + // in-flight payment to this payment hash, or 2) a complete payment for + // the same hash. + if err := s.control.ClearForTakeoff(htlc); err != nil { return zeroPreimage, err } @@ -356,7 +357,6 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, // able to retrieve it and return response to the user. payment := &pendingPayment{ err: make(chan error, 1), - response: make(chan *htlcPacket, 1), preimage: make(chan [sha256.Size]byte, 1), paymentHash: htlc.PaymentHash, amount: htlc.Amount, @@ -394,7 +394,6 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, // Returns channels so that other subsystem might wait/skip the // waiting of handling of payment. var preimage [sha256.Size]byte - var response *htlcPacket select { case e := <-payment.err: @@ -403,13 +402,6 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, return zeroPreimage, ErrSwitchExiting } - select { - case pkt := <-payment.response: - response = pkt - case <-s.quit: - return zeroPreimage, ErrSwitchExiting - } - select { case p := <-payment.preimage: preimage = p @@ -417,24 +409,6 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, return zeroPreimage, ErrSwitchExiting } - // Remove circuit since we are about to complete an add/fail of this - // HTLC. - if teardownErr := s.teardownCircuit(response); teardownErr != nil { - log.Warnf("unable to teardown circuit %s: %v", - response.inKey(), teardownErr) - return preimage, err - } - - // Finally, if this response is contained in a forwarding package, ack - // the settle/fail so that we don't continue to retransmit the HTLC - // internally. - if response.destRef != nil { - if ackErr := s.ackSettleFail(*response.destRef); ackErr != nil { - log.Warnf("unable to ack settle/fail reference: %s: %v", - *response.destRef, ackErr) - } - } - return preimage, err } @@ -782,20 +756,10 @@ func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error, // Alice Bob Carol // func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { - // Pending payments use a special interpretation of the incomingChanID and - // incomingHTLCID fields on packet where the channel ID is blank and the - // HTLC ID is the payment ID. The switch basically views the users of the - // node as a special channel that also offers a sequence of HTLCs. - payment, err := s.findPayment(pkt.incomingHTLCID) - if err != nil { - return err - } - - switch htlc := pkt.htlc.(type) { - // User have created the htlc update therefore we should find the // appropriate channel link and send the payment over this link. - case *lnwire.UpdateAddHTLC: + if htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC); ok { + // Try to find links by node destination. s.indexMtx.RLock() link, err := s.getLinkByShortID(pkt.outgoingChanID) s.indexMtx.RUnlock() @@ -839,32 +803,85 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { } return link.HandleSwitchPacket(pkt) + } - // We've just received a settle update which means we can finalize the - // user payment and return successful response. + // Otherwise this is a response to a payment that we initiated. We'll + // clean up any fwdpkg references, circuit entries, and mark in our db + // that the payment for this payment hash has either succeeded or + // failed. + // + // If this response is contained in a forwarding package, we'll start by + // acking the settle/fail so that we don't continue to retransmit the + // HTLC internally. + if pkt.destRef != nil { + if err := s.ackSettleFail(*pkt.destRef); err != nil { + log.Warnf("Unable to ack settle/fail reference: %s: %v", + *pkt.destRef, err) + return err + } + } + + // Next, we'll remove the circuit since we are about to complete an + // fulfill/fail of this HTLC. Since we've already removed the + // settle/fail fwdpkg reference, the response from the peer cannot be + // replayed internally if this step fails. If this happens, this logic + // will be executed when a provided resolution message comes through. + // This can only happen if the circuit is still open, which is why this + // ordering is chosen. + if err := s.teardownCircuit(pkt); err != nil { + log.Warnf("Unable to teardown circuit %s: %v", + pkt.inKey(), err) + return err + } + + // Locate the pending payment to notify the application that this + // payment has failed. If one is not found, it likely means the daemon + // has been restarted since sending the payment. + payment := s.findPayment(pkt.incomingHTLCID) + + var ( + preimage [32]byte + paymentErr error + ) + + switch htlc := pkt.htlc.(type) { + + // We've received a settle update which means we can finalize the user + // payment and return successful response. case *lnwire.UpdateFulfillHTLC: - // Notify the user that his payment was successfully proceed. - payment.err <- nil - payment.response <- pkt - payment.preimage <- htlc.PaymentPreimage - s.removePendingPayment(pkt.incomingHTLCID) - + // Persistently mark that a payment to this payment hash + // succeeded. This will prevent us from ever making another + // payment to this hash. if err := s.control.Success(pkt.circuit.PaymentHash); err != nil { return err } - // We've just received a fail update which means we can finalize the - // user payment and return fail response. + preimage = htlc.PaymentPreimage + + // We've received a fail update which means we can finalize the user + // payment and return fail response. case *lnwire.UpdateFailHTLC: - payment.err <- s.parseFailedPayment(payment, pkt, htlc) - payment.response <- pkt - payment.preimage <- zeroPreimage - s.removePendingPayment(pkt.incomingHTLCID) + // Persistently mark that a payment to this payment hash failed. + // This will permit us to make another attempt at a successful + // payment. + if err := s.control.Fail(pkt.circuit.PaymentHash); err != nil { + return err + } + + paymentErr = s.parseFailedPayment(payment, pkt, htlc) default: return errors.New("wrong update type") } + // Deliver the payment error and preimage to the application, if it is + // waiting for a response. + if payment != nil { + payment.err <- paymentErr + payment.preimage <- preimage + s.removePendingPayment(pkt.incomingHTLCID) + } + return nil } @@ -890,7 +907,8 @@ func (s *Switch) parseFailedPayment(payment *pendingPayment, pkt *htlcPacket, failureMsg, err := lnwire.DecodeFailure(r, 0) if err != nil { userErr = fmt.Sprintf("unable to decode onion failure, "+ - "htlc with hash(%x): %v", payment.paymentHash[:], err) + "htlc with hash(%x): %v", + payment.paymentHash[:], err) log.Error(userErr) // As this didn't even clear the link, we don't need to @@ -917,10 +935,6 @@ func (s *Switch) parseFailedPayment(payment *pendingPayment, pkt *htlcPacket, FailureMessage: lnwire.FailPermanentChannelFailure{}, } - if err := s.control.Fail(pkt.circuit.PaymentHash); err != nil { - log.Error(err) - } - // A regular multi-hop payment error that we'll need to // decrypt. default: @@ -937,10 +951,6 @@ func (s *Switch) parseFailedPayment(payment *pendingPayment, pkt *htlcPacket, ExtraMsg: userErr, FailureMessage: lnwire.NewTemporaryChannelFailure(nil), } - - if err := s.control.Fail(pkt.circuit.PaymentHash); err != nil { - log.Error(err) - } } } @@ -2080,16 +2090,18 @@ func (s *Switch) removePendingPayment(paymentID uint64) error { } // findPayment is the helper function which find the payment. -func (s *Switch) findPayment(paymentID uint64) (*pendingPayment, error) { +func (s *Switch) findPayment(paymentID uint64) *pendingPayment { s.pendingMutex.RLock() defer s.pendingMutex.RUnlock() payment, ok := s.pendingPayments[paymentID] if !ok { - return nil, fmt.Errorf("Cannot find pending payment with ID %d", + log.Errorf("Cannot find pending payment with ID %d", paymentID) + return nil } - return payment, nil + + return payment } // CircuitModifier returns a reference to subset of the interfaces provided by From 9b52c510e991e63afa0f02a1e542862839230966 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 20 Aug 2018 20:26:54 -0700 Subject: [PATCH 18/32] htlcswitch/mock: remove new line to satisfy linter --- htlcswitch/mock.go | 1 - 1 file changed, 1 deletion(-) diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index cd563ce7..e2539fac 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -138,7 +138,6 @@ func initDB() (*channeldb.DB, error) { return db, err } - func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error) { var err error From 875128539c13d8e0a5e6b98a1c3b0ccad6878966 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 10 Aug 2018 14:00:50 -0700 Subject: [PATCH 19/32] htlcswitch/switch_control: expand godocs and add ErrUnknownPaymentStatus --- htlcswitch/switch_control.go | 113 ++++++++++++++++++++++------------- 1 file changed, 72 insertions(+), 41 deletions(-) diff --git a/htlcswitch/switch_control.go b/htlcswitch/switch_control.go index d62b8872..a0492c34 100644 --- a/htlcswitch/switch_control.go +++ b/htlcswitch/switch_control.go @@ -9,36 +9,53 @@ import ( ) var ( - // ErrAlreadyPaid is used when we have already paid - ErrAlreadyPaid = errors.New("invoice was already paid") + // ErrAlreadyPaid signals we have already paid this payment hash. + ErrAlreadyPaid = errors.New("invoice is already paid") - // ErrPaymentInFlight returns in case if payment is already "in flight" + // ErrPaymentInFlight signals that payment for this payment hash is + // already "in flight" on the network. ErrPaymentInFlight = errors.New("payment is in transition") - // ErrPaymentNotInitiated returns in case if payment wasn't initiated - // in switch + // ErrPaymentNotInitiated is returned if payment wasn't initiated in + // switch. ErrPaymentNotInitiated = errors.New("payment isn't initiated") - // ErrPaymentAlreadyCompleted returns in case of attempt to complete - // completed payment + // ErrPaymentAlreadyCompleted is returned in the event we attempt to + // recomplete a completed payment. ErrPaymentAlreadyCompleted = errors.New("payment is already completed") + + // ErrUnknownPaymentStatus is returned when we do not recognize the + // existing state of a payment. + ErrUnknownPaymentStatus = errors.New("unknown payment status") ) -// ControlTower is a controller interface of sending HTLC messages to switch +// ControlTower tracks all outgoing payments made by the switch, whose primary +// purpose is to prevent duplicate payments to the same payment hash. In +// production, a persistent implementation is preferred so that tracking can +// survive across restarts. Payments are transition through various payment +// states, and the ControlTower interface provides access to driving the state +// transitions. type ControlTower interface { - // CheckSend intercepts incoming message to provide checks - // and fail if specific message is not allowed by implementation - CheckSend(htlc *lnwire.UpdateAddHTLC) error + // ClearForTakeoff atomically checks that no inflight or completed + // payments exist for this payment hash. If none are found, this method + // atomically transitions the status for this payment hash as InFlight. + ClearForTakeoff(htlc *lnwire.UpdateAddHTLC) error - // Success marks message transition as successful + // Success transitions an InFlight payment into a Completed payment. + // After invoking this method, ClearForTakeoff should always return an + // error to prevent us from making duplicate payments to the same + // payment hash. Success(paymentHash [32]byte) error - // Fail marks message transition as failed + // Fail transitions an InFlight payment into a Grounded Payment. After + // invoking this method, ClearForTakeoff should return nil on its next + // call for this payment hash, allowing the switch to make a subsequent + // payment. Fail(paymentHash [32]byte) error } -// paymentControl is implementation of ControlTower to restrict double payment -// sending. +// paymentControl is persistent implementation of ControlTower to restrict +// double payment sending. type paymentControl struct { mx sync.Mutex @@ -52,9 +69,9 @@ func NewPaymentControl(db *channeldb.DB) ControlTower { } } -// CheckSend checks that a sending htlc wasn't triggered before for specific -// payment hash, if so, should trigger error depends on current status -func (p *paymentControl) CheckSend(htlc *lnwire.UpdateAddHTLC) error { +// ClearForTakeoff checks that we don't already have an InFlight or Completed +// payment identified by the same payment hash. +func (p *paymentControl) ClearForTakeoff(htlc *lnwire.UpdateAddHTLC) error { p.mx.Lock() defer p.mx.Unlock() @@ -65,27 +82,31 @@ func (p *paymentControl) CheckSend(htlc *lnwire.UpdateAddHTLC) error { } switch paymentStatus { + case channeldb.StatusGrounded: // It is safe to reattempt a payment if we know that we haven't - // left one in flight prior to restarting and switch. - return p.db.UpdatePaymentStatus(htlc.PaymentHash, - channeldb.StatusInFlight) + // left one in flight. Since this one is grounded, Transition + // the payment status to InFlight to prevent others. + return p.db.UpdatePaymentStatus(htlc.PaymentHash, channeldb.StatusInFlight) case channeldb.StatusInFlight: - // Not clear if it's safe to reinitiate a payment if there - // is already a payment in flight, so we should withhold any - // additional attempts to send to that payment hash. + // We already have an InFlight payment on the network. We will + // disallow any more payment until a response is received. return ErrPaymentInFlight case channeldb.StatusCompleted: - // It has been already paid and don't want to pay again. + // We've already completed a payment to this payment hash, + // forbid the switch from sending another. return ErrAlreadyPaid - } - return nil + default: + return ErrUnknownPaymentStatus + } } -// Success proceed status changing of payment to next successful status +// Success transitions an InFlight payment to Completed, otherwise it returns an +// error. After calling Success, ClearForTakeoff should prevent any further +// attempts for the same payment hash. func (p *paymentControl) Success(paymentHash [32]byte) error { p.mx.Lock() defer p.mx.Unlock() @@ -97,22 +118,29 @@ func (p *paymentControl) Success(paymentHash [32]byte) error { switch paymentStatus { case channeldb.StatusGrounded: - // Payment isn't initiated but received. + // Our records show the payment as still being grounded, meaning + // it never should have left the switch. return ErrPaymentNotInitiated case channeldb.StatusInFlight: - // Successful transition from InFlight transition to Completed. + // A successful response was received for an InFlight payment, + // mark it as completed to prevent sending to this payment hash + // again. return p.db.UpdatePaymentStatus(paymentHash, channeldb.StatusCompleted) case channeldb.StatusCompleted: - // Payment is completed before in should be ignored. + // The payment was completed previously, alert the caller that + // this may be a duplicate call. return ErrPaymentAlreadyCompleted - } - return nil + default: + return ErrUnknownPaymentStatus + } } -// Fail proceed status changing of payment to initial status in case of failure +// Fail transitions an InFlight payment to Grounded, otherwise it returns an +// error. After calling Fail, ClearForTakeoff should fail any further attempts +// for the same payment hash. func (p *paymentControl) Fail(paymentHash [32]byte) error { p.mx.Lock() defer p.mx.Unlock() @@ -124,19 +152,22 @@ func (p *paymentControl) Fail(paymentHash [32]byte) error { switch paymentStatus { case channeldb.StatusGrounded: - // Unpredictable behavior when payment wasn't transited to - // StatusInFlight status and was failed. + // Our records show the payment as still being grounded, meaning + // it never should have left the switch. return ErrPaymentNotInitiated case channeldb.StatusInFlight: - // If payment wasn't processed by some reason should return to - // default status to unlock retrying option for the same payment hash. + // A failed response was received for an InFlight payment, mark + // it as Grounded again to allow subsequent attempts. return p.db.UpdatePaymentStatus(paymentHash, channeldb.StatusGrounded) case channeldb.StatusCompleted: - // Payment is completed before and can't be moved to another status. + // The payment was completed previously, and we are now + // reporting that it has failed. Leave the status as completed, + // but alert the user that something is wrong. return ErrPaymentAlreadyCompleted - } - return nil + default: + return ErrUnknownPaymentStatus + } } From 3f0dfd4e4bb3878c633f989404a78edd8f69b508 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 10 Aug 2018 14:01:24 -0700 Subject: [PATCH 20/32] htlcswitch/switch_control_test: extend tests + godocs --- htlcswitch/switch_control_test.go | 129 +++++++++++++++--------------- 1 file changed, 66 insertions(+), 63 deletions(-) diff --git a/htlcswitch/switch_control_test.go b/htlcswitch/switch_control_test.go index ec109718..021ef1da 100644 --- a/htlcswitch/switch_control_test.go +++ b/htlcswitch/switch_control_test.go @@ -24,9 +24,10 @@ func genHtlc() (*lnwire.UpdateAddHTLC, error) { return htlc, nil } -// TestPaymentControlSwitch checks the ability of payment control -// change states of payments -func TestPaymentControlSwitch(t *testing.T) { +// TestPaymentControlSwitchFail checks that payment status returns to Grounded +// status after failing, and that ClearForTakeoff allows another HTLC for the +// same payment hash. +func TestPaymentControlSwitchFail(t *testing.T) { t.Parallel() db, err := initDB() @@ -41,10 +42,40 @@ func TestPaymentControlSwitch(t *testing.T) { t.Fatalf("unable to generate htlc message: %v", err) } - // Sends base htlc message which initiate base status - // and move it to StatusInFlight and verifies that it - // was changed. - if err := pControl.CheckSend(htlc); err != nil { + // Sends base htlc message which initiate StatusInFlight. + if err := pControl.ClearForTakeoff(htlc); err != nil { + t.Fatalf("unable to send htlc message: %v", err) + } + + pStatus, err := db.FetchPaymentStatus(htlc.PaymentHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if pStatus != channeldb.StatusInFlight { + t.Fatalf("payment status mismatch: expected %v, got %v", + channeldb.StatusInFlight, pStatus) + } + + // Fail the payment, which should moved it to Grounded. + if err := pControl.Fail(htlc.PaymentHash); err != nil { + t.Fatalf("unable to fail payment hash: %v", err) + } + + // Verify the status is indeed Grounded. + pStatus, err = db.FetchPaymentStatus(htlc.PaymentHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if pStatus != channeldb.StatusGrounded { + t.Fatalf("payment status mismatch: expected %v, got %v", + channeldb.StatusGrounded, pStatus) + } + + // Sends the htlc again, which should succeed since the prior payment + // failed. + if err := pControl.ClearForTakeoff(htlc); err != nil { t.Fatalf("unable to send htlc message: %v", err) } @@ -72,56 +103,16 @@ func TestPaymentControlSwitch(t *testing.T) { t.Fatalf("payment status mismatch: expected %v, got %v", channeldb.StatusCompleted, pStatus) } -} -// TestPaymentControlSwitchFail checks that payment status returns -// to initial status after fail -func TestPaymentControlSwitchFail(t *testing.T) { - t.Parallel() - - db, err := initDB() - if err != nil { - t.Fatalf("unable to init db: %v", err) - } - - pControl := NewPaymentControl(db) - - htlc, err := genHtlc() - if err != nil { - t.Fatalf("unable to generate htlc message: %v", err) - } - - // Sends base htlc message which initiate StatusInFlight. - if err := pControl.CheckSend(htlc); err != nil { + // Attempt a final payment, which should now fail since the prior + // payment succeed. + if err := pControl.ClearForTakeoff(htlc); err != nil { t.Fatalf("unable to send htlc message: %v", err) } - - pStatus, err := db.FetchPaymentStatus(htlc.PaymentHash) - if err != nil { - t.Fatalf("unable to fetch payment status: %v", err) - } - - if pStatus != channeldb.StatusInFlight { - t.Fatalf("payment status mismatch: expected %v, got %v", - channeldb.StatusInFlight, pStatus) - } - - // Move payment to completed status, second payment should return error. - pControl.Fail(htlc.PaymentHash) - - pStatus, err = db.FetchPaymentStatus(htlc.PaymentHash) - if err != nil { - t.Fatalf("unable to fetch payment status: %v", err) - } - - if pStatus != channeldb.StatusGrounded { - t.Fatalf("payment status mismatch: expected %v, got %v", - channeldb.StatusGrounded, pStatus) - } } -// TestPaymentControlSwitchDoubleSend checks the ability of payment control -// to prevent double sending of htlc message, when message is in StatusInFlight +// TestPaymentControlSwitchDoubleSend checks the ability of payment control to +// prevent double sending of htlc message, when message is in StatusInFlight. func TestPaymentControlSwitchDoubleSend(t *testing.T) { t.Parallel() @@ -137,10 +128,9 @@ func TestPaymentControlSwitchDoubleSend(t *testing.T) { t.Fatalf("unable to generate htlc message: %v", err) } - // Sends base htlc message which initiate base status - // and move it to StatusInFlight and verifies that it - // was changed. - if err := pControl.CheckSend(htlc); err != nil { + // Sends base htlc message which initiate base status and move it to + // StatusInFlight and verifies that it was changed. + if err := pControl.ClearForTakeoff(htlc); err != nil { t.Fatalf("unable to send htlc message: %v", err) } @@ -154,16 +144,17 @@ func TestPaymentControlSwitchDoubleSend(t *testing.T) { channeldb.StatusInFlight, pStatus) } - // Tries to initiate double sending of htlc message with the same - // payment hash. - if err := pControl.CheckSend(htlc); err != ErrPaymentInFlight { + // Try to initiate double sending of htlc message with the same + // payment hash, should result in error indicating that payment has + // already been sent. + if err := pControl.ClearForTakeoff(htlc); err != ErrPaymentInFlight { t.Fatalf("payment control wrong behaviour: " + "double sending must trigger ErrPaymentInFlight error") } } -// TestPaymentControlSwitchDoublePay checks the ability of payment control -// to prevent double payment +// TestPaymentControlSwitchDoublePay checks the ability of payment control to +// prevent double payment. func TestPaymentControlSwitchDoublePay(t *testing.T) { t.Parallel() @@ -180,10 +171,11 @@ func TestPaymentControlSwitchDoublePay(t *testing.T) { } // Sends base htlc message which initiate StatusInFlight. - if err := pControl.CheckSend(htlc); err != nil { + if err := pControl.ClearForTakeoff(htlc); err != nil { t.Fatalf("unable to send htlc message: %v", err) } + // Verify that payment is InFlight. pStatus, err := db.FetchPaymentStatus(htlc.PaymentHash) if err != nil { t.Fatalf("unable to fetch payment status: %v", err) @@ -199,7 +191,18 @@ func TestPaymentControlSwitchDoublePay(t *testing.T) { t.Fatalf("error shouldn't have been received, got: %v", err) } - if err := pControl.CheckSend(htlc); err != ErrAlreadyPaid { + // Verify that payment is Completed. + pStatus, err = db.FetchPaymentStatus(htlc.PaymentHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if pStatus != channeldb.StatusCompleted { + t.Fatalf("payment status mismatch: expected %v, got %v", + channeldb.StatusCompleted, pStatus) + } + + if err := pControl.ClearForTakeoff(htlc); err != ErrAlreadyPaid { t.Fatalf("payment control wrong behaviour:" + " double payment must trigger ErrAlreadyPaid") } From 8b2237fc1d1851a224c6d62b2e7201a028b41d5d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 10 Aug 2018 14:02:31 -0700 Subject: [PATCH 21/32] htlcswitch/control_tower: move from switch_control --- htlcswitch/{switch_control.go => control_tower.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename htlcswitch/{switch_control.go => control_tower.go} (100%) diff --git a/htlcswitch/switch_control.go b/htlcswitch/control_tower.go similarity index 100% rename from htlcswitch/switch_control.go rename to htlcswitch/control_tower.go From 971ae3c7442f4b7144f1716f39430d501fe8b847 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 10 Aug 2018 14:02:50 -0700 Subject: [PATCH 22/32] htlcswitch/control_tower_test: move from switch_control_test --- htlcswitch/{switch_control_test.go => control_tower_test.go} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename htlcswitch/{switch_control_test.go => control_tower_test.go} (97%) diff --git a/htlcswitch/switch_control_test.go b/htlcswitch/control_tower_test.go similarity index 97% rename from htlcswitch/switch_control_test.go rename to htlcswitch/control_tower_test.go index 021ef1da..cf0258dd 100644 --- a/htlcswitch/switch_control_test.go +++ b/htlcswitch/control_tower_test.go @@ -79,7 +79,7 @@ func TestPaymentControlSwitchFail(t *testing.T) { t.Fatalf("unable to send htlc message: %v", err) } - pStatus, err := db.FetchPaymentStatus(htlc.PaymentHash) + pStatus, err = db.FetchPaymentStatus(htlc.PaymentHash) if err != nil { t.Fatalf("unable to fetch payment status: %v", err) } @@ -106,7 +106,7 @@ func TestPaymentControlSwitchFail(t *testing.T) { // Attempt a final payment, which should now fail since the prior // payment succeed. - if err := pControl.ClearForTakeoff(htlc); err != nil { + if err := pControl.ClearForTakeoff(htlc); err != ErrAlreadyPaid { t.Fatalf("unable to send htlc message: %v", err) } } From 98d2ffbfd04c6348214872409772439af811118d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 13 Aug 2018 18:46:58 -0700 Subject: [PATCH 23/32] htlcswitch/control_tower: add strict mode toggling --- htlcswitch/control_tower.go | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/htlcswitch/control_tower.go b/htlcswitch/control_tower.go index a0492c34..6c04f992 100644 --- a/htlcswitch/control_tower.go +++ b/htlcswitch/control_tower.go @@ -57,15 +57,17 @@ type ControlTower interface { // paymentControl is persistent implementation of ControlTower to restrict // double payment sending. type paymentControl struct { - mx sync.Mutex + strict bool + mx sync.Mutex db *channeldb.DB } // NewPaymentControl creates a new instance of the paymentControl. -func NewPaymentControl(db *channeldb.DB) ControlTower { +func NewPaymentControl(strict bool, db *channeldb.DB) ControlTower { return &paymentControl{ - db: db, + strict: strict, + db: db, } } @@ -116,19 +118,25 @@ func (p *paymentControl) Success(paymentHash [32]byte) error { return err } - switch paymentStatus { - case channeldb.StatusGrounded: + switch { + + case paymentStatus == channeldb.StatusGrounded && p.strict: // Our records show the payment as still being grounded, meaning // it never should have left the switch. return ErrPaymentNotInitiated - case channeldb.StatusInFlight: + case paymentStatus == channeldb.StatusGrounded && !p.strict: + // Our records show the payment as still being grounded, meaning + // it never should have left the switch. + fallthrough + + case paymentStatus == channeldb.StatusInFlight: // A successful response was received for an InFlight payment, // mark it as completed to prevent sending to this payment hash // again. return p.db.UpdatePaymentStatus(paymentHash, channeldb.StatusCompleted) - case channeldb.StatusCompleted: + case paymentStatus == channeldb.StatusCompleted: // The payment was completed previously, alert the caller that // this may be a duplicate call. return ErrPaymentAlreadyCompleted @@ -150,18 +158,24 @@ func (p *paymentControl) Fail(paymentHash [32]byte) error { return err } - switch paymentStatus { - case channeldb.StatusGrounded: + switch { + + case paymentStatus == channeldb.StatusGrounded && p.strict: // Our records show the payment as still being grounded, meaning // it never should have left the switch. return ErrPaymentNotInitiated - case channeldb.StatusInFlight: + case paymentStatus == channeldb.StatusGrounded && !p.strict: + // Our records show the payment as still being grounded, meaning + // it never should have left the switch. + fallthrough + + case paymentStatus == channeldb.StatusInFlight: // A failed response was received for an InFlight payment, mark // it as Grounded again to allow subsequent attempts. return p.db.UpdatePaymentStatus(paymentHash, channeldb.StatusGrounded) - case channeldb.StatusCompleted: + case paymentStatus == channeldb.StatusCompleted: // The payment was completed previously, and we are now // reporting that it has failed. Leave the status as completed, // but alert the user that something is wrong. From 2027444a56995606371e4c2458b8c521c5e37a9c Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 13 Aug 2018 18:47:19 -0700 Subject: [PATCH 24/32] htlcswitch/control_tower_test: test strict and non-strict ctrltwrs --- htlcswitch/control_tower_test.go | 284 +++++++++++++++++++++++-------- 1 file changed, 213 insertions(+), 71 deletions(-) diff --git a/htlcswitch/control_tower_test.go b/htlcswitch/control_tower_test.go index cf0258dd..2728e362 100644 --- a/htlcswitch/control_tower_test.go +++ b/htlcswitch/control_tower_test.go @@ -24,10 +24,62 @@ func genHtlc() (*lnwire.UpdateAddHTLC, error) { return htlc, nil } -// TestPaymentControlSwitchFail checks that payment status returns to Grounded +type paymentControlTestCase func(*testing.T, bool) + +var paymentControlTests = []struct { + name string + strict bool + testcase paymentControlTestCase +}{ + { + name: "fail-strict", + strict: true, + testcase: testPaymentControlSwitchFail, + }, + { + name: "double-send-strict", + strict: true, + testcase: testPaymentControlSwitchDoubleSend, + }, + { + name: "double-pay-strict", + strict: true, + testcase: testPaymentControlSwitchDoublePay, + }, + { + name: "fail-not-strict", + strict: false, + testcase: testPaymentControlSwitchFail, + }, + { + name: "double-send-not-strict", + strict: false, + testcase: testPaymentControlSwitchDoubleSend, + }, + { + name: "double-pay-not-strict", + strict: false, + testcase: testPaymentControlSwitchDoublePay, + }, +} + +// TestPaymentControls runs a set of common tests against both the strict and +// non-strict payment control instances. This ensures that the two both behave +// identically when making the expected state-transitions of the stricter +// implementation. Behavioral differences in the strict and non-strict +// implementations are tested separately. +func TestPaymentControls(t *testing.T) { + for _, test := range paymentControlTests { + t.Run(test.name, func(t *testing.T) { + test.testcase(t, test.strict) + }) + } +} + +// testPaymentControlSwitchFail checks that payment status returns to Grounded // status after failing, and that ClearForTakeoff allows another HTLC for the // same payment hash. -func TestPaymentControlSwitchFail(t *testing.T) { +func testPaymentControlSwitchFail(t *testing.T, strict bool) { t.Parallel() db, err := initDB() @@ -35,7 +87,7 @@ func TestPaymentControlSwitchFail(t *testing.T) { t.Fatalf("unable to init db: %v", err) } - pControl := NewPaymentControl(db) + pControl := NewPaymentControl(strict, db) htlc, err := genHtlc() if err != nil { @@ -47,15 +99,7 @@ func TestPaymentControlSwitchFail(t *testing.T) { t.Fatalf("unable to send htlc message: %v", err) } - pStatus, err := db.FetchPaymentStatus(htlc.PaymentHash) - if err != nil { - t.Fatalf("unable to fetch payment status: %v", err) - } - - if pStatus != channeldb.StatusInFlight { - t.Fatalf("payment status mismatch: expected %v, got %v", - channeldb.StatusInFlight, pStatus) - } + assertPaymentStatus(t, db, htlc.PaymentHash, channeldb.StatusInFlight) // Fail the payment, which should moved it to Grounded. if err := pControl.Fail(htlc.PaymentHash); err != nil { @@ -63,15 +107,7 @@ func TestPaymentControlSwitchFail(t *testing.T) { } // Verify the status is indeed Grounded. - pStatus, err = db.FetchPaymentStatus(htlc.PaymentHash) - if err != nil { - t.Fatalf("unable to fetch payment status: %v", err) - } - - if pStatus != channeldb.StatusGrounded { - t.Fatalf("payment status mismatch: expected %v, got %v", - channeldb.StatusGrounded, pStatus) - } + assertPaymentStatus(t, db, htlc.PaymentHash, channeldb.StatusGrounded) // Sends the htlc again, which should succeed since the prior payment // failed. @@ -79,30 +115,14 @@ func TestPaymentControlSwitchFail(t *testing.T) { t.Fatalf("unable to send htlc message: %v", err) } - pStatus, err = db.FetchPaymentStatus(htlc.PaymentHash) - if err != nil { - t.Fatalf("unable to fetch payment status: %v", err) - } - - if pStatus != channeldb.StatusInFlight { - t.Fatalf("payment status mismatch: expected %v, got %v", - channeldb.StatusInFlight, pStatus) - } + assertPaymentStatus(t, db, htlc.PaymentHash, channeldb.StatusInFlight) // Verifies that status was changed to StatusCompleted. if err := pControl.Success(htlc.PaymentHash); err != nil { t.Fatalf("error shouldn't have been received, got: %v", err) } - pStatus, err = db.FetchPaymentStatus(htlc.PaymentHash) - if err != nil { - t.Fatalf("unable to fetch payment status: %v", err) - } - - if pStatus != channeldb.StatusCompleted { - t.Fatalf("payment status mismatch: expected %v, got %v", - channeldb.StatusCompleted, pStatus) - } + assertPaymentStatus(t, db, htlc.PaymentHash, channeldb.StatusCompleted) // Attempt a final payment, which should now fail since the prior // payment succeed. @@ -111,9 +131,9 @@ func TestPaymentControlSwitchFail(t *testing.T) { } } -// TestPaymentControlSwitchDoubleSend checks the ability of payment control to +// testPaymentControlSwitchDoubleSend checks the ability of payment control to // prevent double sending of htlc message, when message is in StatusInFlight. -func TestPaymentControlSwitchDoubleSend(t *testing.T) { +func testPaymentControlSwitchDoubleSend(t *testing.T, strict bool) { t.Parallel() db, err := initDB() @@ -121,7 +141,7 @@ func TestPaymentControlSwitchDoubleSend(t *testing.T) { t.Fatalf("unable to init db: %v", err) } - pControl := NewPaymentControl(db) + pControl := NewPaymentControl(strict, db) htlc, err := genHtlc() if err != nil { @@ -134,15 +154,7 @@ func TestPaymentControlSwitchDoubleSend(t *testing.T) { t.Fatalf("unable to send htlc message: %v", err) } - pStatus, err := db.FetchPaymentStatus(htlc.PaymentHash) - if err != nil { - t.Fatalf("unable to fetch payment status: %v", err) - } - - if pStatus != channeldb.StatusInFlight { - t.Fatalf("payment status mismatch: expected %v, got %v", - channeldb.StatusInFlight, pStatus) - } + assertPaymentStatus(t, db, htlc.PaymentHash, channeldb.StatusInFlight) // Try to initiate double sending of htlc message with the same // payment hash, should result in error indicating that payment has @@ -155,7 +167,7 @@ func TestPaymentControlSwitchDoubleSend(t *testing.T) { // TestPaymentControlSwitchDoublePay checks the ability of payment control to // prevent double payment. -func TestPaymentControlSwitchDoublePay(t *testing.T) { +func testPaymentControlSwitchDoublePay(t *testing.T, strict bool) { t.Parallel() db, err := initDB() @@ -163,7 +175,7 @@ func TestPaymentControlSwitchDoublePay(t *testing.T) { t.Fatalf("unable to init db: %v", err) } - pControl := NewPaymentControl(db) + pControl := NewPaymentControl(strict, db) htlc, err := genHtlc() if err != nil { @@ -176,15 +188,7 @@ func TestPaymentControlSwitchDoublePay(t *testing.T) { } // Verify that payment is InFlight. - pStatus, err := db.FetchPaymentStatus(htlc.PaymentHash) - if err != nil { - t.Fatalf("unable to fetch payment status: %v", err) - } - - if pStatus != channeldb.StatusInFlight { - t.Fatalf("payment status mismatch: expected %v, got %v", - channeldb.StatusInFlight, pStatus) - } + assertPaymentStatus(t, db, htlc.PaymentHash, channeldb.StatusInFlight) // Move payment to completed status, second payment should return error. if err := pControl.Success(htlc.PaymentHash); err != nil { @@ -192,18 +196,156 @@ func TestPaymentControlSwitchDoublePay(t *testing.T) { } // Verify that payment is Completed. - pStatus, err = db.FetchPaymentStatus(htlc.PaymentHash) - if err != nil { - t.Fatalf("unable to fetch payment status: %v", err) - } - - if pStatus != channeldb.StatusCompleted { - t.Fatalf("payment status mismatch: expected %v, got %v", - channeldb.StatusCompleted, pStatus) - } + assertPaymentStatus(t, db, htlc.PaymentHash, channeldb.StatusCompleted) if err := pControl.ClearForTakeoff(htlc); err != ErrAlreadyPaid { t.Fatalf("payment control wrong behaviour:" + " double payment must trigger ErrAlreadyPaid") } } + +// TestPaymentControlNonStrictSuccessesWithoutInFlight checks that a non-strict +// payment control will allow calls to Success when no payment is in flight. This +// is necessary to gracefully handle the case in which the switch already sent +// out a payment for a particular payment hash in a prior db version that didn't +// have payment statuses. +func TestPaymentControlNonStrictSuccessesWithoutInFlight(t *testing.T) { + t.Parallel() + + db, err := initDB() + if err != nil { + t.Fatalf("unable to init db: %v", err) + } + + pControl := NewPaymentControl(false, db) + + htlc, err := genHtlc() + if err != nil { + t.Fatalf("unable to generate htlc message: %v", err) + } + + if err := pControl.Success(htlc.PaymentHash); err != nil { + t.Fatalf("unable to mark payment hash success: %v", err) + } + + assertPaymentStatus(t, db, htlc.PaymentHash, channeldb.StatusCompleted) + + err = pControl.Success(htlc.PaymentHash) + if err != ErrPaymentAlreadyCompleted { + t.Fatalf("unable to remark payment hash failed: %v", err) + } +} + +// TestPaymentControlNonStrictFailsWithoutInFlight checks that a non-strict +// payment control will allow calls to Fail when no payment is in flight. This +// is necessary to gracefully handle the case in which the switch already sent +// out a payment for a particular payment hash in a prior db version that didn't +// have payment statuses. +func TestPaymentControlNonStrictFailsWithoutInFlight(t *testing.T) { + t.Parallel() + + db, err := initDB() + if err != nil { + t.Fatalf("unable to init db: %v", err) + } + + pControl := NewPaymentControl(false, db) + + htlc, err := genHtlc() + if err != nil { + t.Fatalf("unable to generate htlc message: %v", err) + } + + if err := pControl.Fail(htlc.PaymentHash); err != nil { + t.Fatalf("unable to mark payment hash failed: %v", err) + } + + assertPaymentStatus(t, db, htlc.PaymentHash, channeldb.StatusGrounded) + + err = pControl.Fail(htlc.PaymentHash) + if err != nil { + t.Fatalf("unable to remark payment hash failed: %v", err) + } + + assertPaymentStatus(t, db, htlc.PaymentHash, channeldb.StatusGrounded) + + err = pControl.Success(htlc.PaymentHash) + if err != nil { + t.Fatalf("unable to remark payment hash success: %v", err) + } + + assertPaymentStatus(t, db, htlc.PaymentHash, channeldb.StatusCompleted) + + err = pControl.Fail(htlc.PaymentHash) + if err != ErrPaymentAlreadyCompleted { + t.Fatalf("unable to remark payment hash failed: %v", err) + } + + assertPaymentStatus(t, db, htlc.PaymentHash, channeldb.StatusCompleted) +} + +// TestPaymentControlStrictSuccessesWithoutInFlight checks that a strict payment +// control will disallow calls to Success when no payment is in flight. +func TestPaymentControlStrictSuccessesWithoutInFlight(t *testing.T) { + t.Parallel() + + db, err := initDB() + if err != nil { + t.Fatalf("unable to init db: %v", err) + } + + pControl := NewPaymentControl(true, db) + + htlc, err := genHtlc() + if err != nil { + t.Fatalf("unable to generate htlc message: %v", err) + } + + err = pControl.Success(htlc.PaymentHash) + if err != ErrPaymentNotInitiated { + t.Fatalf("expected ErrPaymentNotInitiated, got %v", err) + } + + assertPaymentStatus(t, db, htlc.PaymentHash, channeldb.StatusGrounded) +} + +// TestPaymentControlStrictFailsWithoutInFlight checks that a strict payment +// control will disallow calls to Fail when no payment is in flight. +func TestPaymentControlStrictFailsWithoutInFlight(t *testing.T) { + t.Parallel() + + db, err := initDB() + if err != nil { + t.Fatalf("unable to init db: %v", err) + } + + pControl := NewPaymentControl(true, db) + + htlc, err := genHtlc() + if err != nil { + t.Fatalf("unable to generate htlc message: %v", err) + } + + err = pControl.Fail(htlc.PaymentHash) + if err != ErrPaymentNotInitiated { + t.Fatalf("expected ErrPaymentNotInitiated, got %v", err) + } + + assertPaymentStatus(t, db, htlc.PaymentHash, channeldb.StatusGrounded) +} + +func assertPaymentStatus(t *testing.T, db *channeldb.DB, + hash [32]byte, expStatus channeldb.PaymentStatus) { + + t.Helper() + + pStatus, err := db.FetchPaymentStatus(hash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if pStatus != expStatus { + t.Fatalf("payment status mismatch: expected %v, got %v", + expStatus, pStatus) + } +} From 2dd8f07014a4cb6b1e55ad2f5bcc0d1d80e5620e Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 13 Aug 2018 18:49:22 -0700 Subject: [PATCH 25/32] htlcswitch/switch: use non-strict PaymentControl --- htlcswitch/switch.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index d6e6a46a..c0cb310b 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -290,7 +290,7 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) { cfg: &cfg, circuits: circuitMap, paymentSequencer: sequencer, - control: NewPaymentControl(cfg.DB), + control: NewPaymentControl(false, cfg.DB), linkIndex: make(map[lnwire.ChannelID]ChannelLink), mailOrchestrator: newMailOrchestrator(), forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink), @@ -852,7 +852,8 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { // Persistently mark that a payment to this payment hash // succeeded. This will prevent us from ever making another // payment to this hash. - if err := s.control.Success(pkt.circuit.PaymentHash); err != nil { + err := s.control.Success(pkt.circuit.PaymentHash) + if err != nil && err != ErrPaymentAlreadyCompleted { return err } @@ -864,7 +865,8 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { // Persistently mark that a payment to this payment hash failed. // This will permit us to make another attempt at a successful // payment. - if err := s.control.Fail(pkt.circuit.PaymentHash); err != nil { + err := s.control.Fail(pkt.circuit.PaymentHash) + if err != nil && err != ErrPaymentAlreadyCompleted { return err } From 11bb5685f975c889eca09a83f2925e8bc6494d53 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 13 Aug 2018 20:38:30 -0700 Subject: [PATCH 26/32] channeldb/migrations: mark locally-sourced payments as InFlight... by reading the payment hash from the circuit map. --- channeldb/migrations.go | 49 ++++++++++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/channeldb/migrations.go b/channeldb/migrations.go index 5a127041..c7beb638 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -3,6 +3,7 @@ package channeldb import ( "bytes" "crypto/sha256" + "encoding/binary" "fmt" "github.com/coreos/bbolt" @@ -379,12 +380,6 @@ func migrateEdgePolicies(tx *bolt.Tx) error { // statuses for each existing payment entity in bucket to be able control // transitions of statuses and prevent cases such as double payment func paymentStatusesMigration(tx *bolt.Tx) error { - // Get the bucket dedicated to storing payments - bucket := tx.Bucket(paymentBucket) - if bucket == nil { - return nil - } - // Get the bucket dedicated to storing statuses of payments, // where a key is payment hash, value is payment status. paymentStatuses, err := tx.CreateBucketIfNotExists(paymentStatusBucket) @@ -392,8 +387,46 @@ func paymentStatusesMigration(tx *bolt.Tx) error { return err } - log.Infof("Migrating database to support payment statuses -- " + - "marking all existing payments with status Completed") + log.Infof("Migrating database to support payment statuses") + + circuitAddKey := []byte("circuit-adds") + circuits := tx.Bucket(circuitAddKey) + if circuits != nil { + log.Infof("Marking all known circuits with status InFlight") + + err = circuits.ForEach(func(k, v []byte) error { + // Parse the first 8 bytes as the short chan ID for the + // circuit. We'll skip all short chan IDs are not + // locally initiated, which includes all non-zero short + // chan ids. + chanID := binary.BigEndian.Uint64(k[:8]) + if chanID != 0 { + return nil + } + + // The payment hash is the third item in the serialized + // payment circuit. The first two items are an AddRef + // (10 bytes) and the incoming circuit key (16 bytes). + const payHashOffset = 10 + 16 + + paymentHash := v[payHashOffset : payHashOffset+32] + + return paymentStatuses.Put( + paymentHash[:], StatusInFlight.Bytes(), + ) + }) + if err != nil { + return err + } + } + + log.Infof("Marking all existing payments with status Completed") + + // Get the bucket dedicated to storing payments + bucket := tx.Bucket(paymentBucket) + if bucket == nil { + return nil + } // For each payment in the bucket, deserialize the payment and mark it // as completed. From 0865ac7cf6fcfb68639a41d7041b77de9e1c8b08 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 13 Aug 2018 20:39:16 -0700 Subject: [PATCH 27/32] channeldb/migrations_test: assert locally-sourced circuits... in the circuit map are marked StatusInFlight. We also check that hashes contained in forwarded circuits are not updated. --- channeldb/migrations_test.go | 118 +++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/channeldb/migrations_test.go b/channeldb/migrations_test.go index 7a1cc1ad..6fbefd0c 100644 --- a/channeldb/migrations_test.go +++ b/channeldb/migrations_test.go @@ -2,7 +2,10 @@ package channeldb import ( "crypto/sha256" + "encoding/binary" "testing" + + "github.com/coreos/bbolt" ) // TestPaymentStatusesMigration checks that already completed payments will have @@ -40,6 +43,82 @@ func TestPaymentStatusesMigration(t *testing.T) { t.Fatalf("wrong payment status: expected %v, got %v", StatusGrounded.String(), paymentStatus.String()) } + + // Lastly, we'll add a locally-sourced circuit and + // non-locally-sourced circuit to the circuit map. The + // locally-sourced payment should end up with an InFlight + // status, while the other should remain unchanged, which + // defaults to Grounded. + err = d.Update(func(tx *bolt.Tx) error { + circuits, err := tx.CreateBucketIfNotExists( + []byte("circuit-adds"), + ) + if err != nil { + return err + } + + groundedKey := make([]byte, 16) + binary.BigEndian.PutUint64(groundedKey[:8], 1) + binary.BigEndian.PutUint64(groundedKey[8:], 1) + + // Generated using TestHalfCircuitSerialization with nil + // ErrorEncrypter, which is the case for locally-sourced + // payments. No payment status should end up being set + // for this circuit, since the short channel id of the + // key is non-zero (e.g., a forwarded circuit). This + // will default it to Grounded. + groundedCircuit := []byte{ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x01, + // start payment hash + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // end payment hash + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, + 0x42, 0x40, 0x00, + } + + err = circuits.Put(groundedKey, groundedCircuit) + if err != nil { + return err + } + + inFlightKey := make([]byte, 16) + binary.BigEndian.PutUint64(inFlightKey[:8], 0) + binary.BigEndian.PutUint64(inFlightKey[8:], 1) + + // Generated using TestHalfCircuitSerialization with nil + // ErrorEncrypter, which is not the case for forwarded + // payments, but should have no impact on the + // correctness of the test. The payment status for this + // circuit should be set to InFlight, since the short + // channel id in the key is 0 (sourceHop). + inFlightCircuit := []byte{ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x01, + // start payment hash + 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // end payment hash + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, + 0x42, 0x40, 0x00, + } + + return circuits.Put(inFlightKey, inFlightCircuit) + }) + if err != nil { + t.Fatalf("unable to add circuit map entry: %v", err) + } } // Verify that the created payment status is "Completed" for our one @@ -54,6 +133,7 @@ func TestPaymentStatusesMigration(t *testing.T) { t.Fatal("migration 'paymentStatusesMigration' wasn't applied") } + // Check that our completed payments were migrated. paymentStatus, err := d.FetchPaymentStatus(paymentHash) if err != nil { t.Fatalf("unable to fetch payment status: %v", err) @@ -63,6 +143,44 @@ func TestPaymentStatusesMigration(t *testing.T) { t.Fatalf("wrong payment status: expected %v, got %v", StatusCompleted.String(), paymentStatus.String()) } + + inFlightHash := [32]byte{ + 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + } + + // Check that the locally sourced payment was transitioned to + // InFlight. + paymentStatus, err = d.FetchPaymentStatus(inFlightHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if paymentStatus != StatusInFlight { + t.Fatalf("wrong payment status: expected %v, got %v", + StatusInFlight.String(), paymentStatus.String()) + } + + groundedHash := [32]byte{ + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + } + + // Check that non-locally sourced payments remain in the default + // Grounded state. + paymentStatus, err = d.FetchPaymentStatus(groundedHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if paymentStatus != StatusGrounded { + t.Fatalf("wrong payment status: expected %v, got %v", + StatusGrounded.String(), paymentStatus.String()) + } } applyMigration(t, From 86b347c996f75f36b263a595509e649a3bfdf26a Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 20 Aug 2018 21:13:06 -0700 Subject: [PATCH 28/32] channeldb/payments: make payment status helper methods This commit splits FetchPaymentStatus and UpdatePaymentStatus, such that they each invoke helper methods that can be composed into different db txns. This enables us to improve performance on send/receive, as we can remove the exclusive lock from the control tower, and allow concurrent calls to utilize Batch more effectively. --- channeldb/payments.go | 63 +++++++++++++++++++++++++++++-------------- 1 file changed, 43 insertions(+), 20 deletions(-) diff --git a/channeldb/payments.go b/channeldb/payments.go index d8d1a856..7d32f20c 100644 --- a/channeldb/payments.go +++ b/channeldb/payments.go @@ -191,33 +191,33 @@ func (db *DB) DeleteAllPayments() error { // local database. func (db *DB) UpdatePaymentStatus(paymentHash [32]byte, status PaymentStatus) error { return db.Batch(func(tx *bolt.Tx) error { - paymentStatuses, err := tx.CreateBucketIfNotExists(paymentStatusBucket) - if err != nil { - return err - } - - return paymentStatuses.Put(paymentHash[:], status.Bytes()) + return UpdatePaymentStatusTx(tx, paymentHash, status) }) } +// UpdatePaymentStatusTx is a helper method that sets the payment status for +// outgoing/finished payments in the local database. This method accepts a +// boltdb transaction such that the operation can be composed into other +// database transactions. +func UpdatePaymentStatusTx(tx *bolt.Tx, + paymentHash [32]byte, status PaymentStatus) error { + + paymentStatuses, err := tx.CreateBucketIfNotExists(paymentStatusBucket) + if err != nil { + return err + } + + return paymentStatuses.Put(paymentHash[:], status.Bytes()) +} + // FetchPaymentStatus returns the payment status for outgoing payment. // If status of the payment isn't found, it will default to "StatusGrounded". func (db *DB) FetchPaymentStatus(paymentHash [32]byte) (PaymentStatus, error) { - // The default status for all payments that aren't recorded in database. - paymentStatus := StatusGrounded - + var paymentStatus = StatusGrounded err := db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket(paymentStatusBucket) - if bucket == nil { - return nil - } - - paymentStatusBytes := bucket.Get(paymentHash[:]) - if paymentStatusBytes == nil { - return nil - } - - return paymentStatus.FromBytes(paymentStatusBytes) + var err error + paymentStatus, err = FetchPaymentStatusTx(tx, paymentHash) + return err }) if err != nil { return StatusGrounded, err @@ -226,6 +226,29 @@ func (db *DB) FetchPaymentStatus(paymentHash [32]byte) (PaymentStatus, error) { return paymentStatus, nil } +// FetchPaymentStatusTx is a helper method that returns the payment status for +// outgoing payment. If status of the payment isn't found, it will default to +// "StatusGrounded". It accepts the boltdb transactions such that this method +// can be composed into other atomic operations. +func FetchPaymentStatusTx(tx *bolt.Tx, paymentHash [32]byte) (PaymentStatus, error) { + // The default status for all payments that aren't recorded in database. + var paymentStatus = StatusGrounded + + bucket := tx.Bucket(paymentStatusBucket) + if bucket == nil { + return paymentStatus, nil + } + + paymentStatusBytes := bucket.Get(paymentHash[:]) + if paymentStatusBytes == nil { + return paymentStatus, nil + } + + paymentStatus.FromBytes(paymentStatusBytes) + + return paymentStatus, nil +} + func serializeOutgoingPayment(w io.Writer, p *OutgoingPayment) error { var scratch [8]byte From 5dc2a4a4b8b02fa3d77e2280c366d0c0ea38306e Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 20 Aug 2018 21:14:52 -0700 Subject: [PATCH 29/32] htlcswitch/control_tower: use one db txn for transitions Composes the new payment status helper methods such that we only require one db txn per state transition. This also allows us to remove the exclusive lock from the control tower, and enable more concurrent requests. --- htlcswitch/control_tower.go | 230 ++++++++++++++++++++++-------------- 1 file changed, 144 insertions(+), 86 deletions(-) diff --git a/htlcswitch/control_tower.go b/htlcswitch/control_tower.go index 6c04f992..47b5bd3d 100644 --- a/htlcswitch/control_tower.go +++ b/htlcswitch/control_tower.go @@ -2,8 +2,8 @@ package htlcswitch import ( "errors" - "sync" + "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" ) @@ -59,11 +59,17 @@ type ControlTower interface { type paymentControl struct { strict bool - mx sync.Mutex db *channeldb.DB } -// NewPaymentControl creates a new instance of the paymentControl. +// NewPaymentControl creates a new instance of the paymentControl. The strict +// flag indicates whether the controller should require "strict" state +// transitions, which would be otherwise intolerant to older databases that may +// already have duplicate payments to the same payment hash. It should be +// enabled only after sufficient checks have been made to ensure the db does not +// contain such payments. In the meantime, non-strict mode enforces a superset +// of the state transitions that prevent additional payments to a given payment +// hash from being added. func NewPaymentControl(strict bool, db *channeldb.DB) ControlTower { return &paymentControl{ strict: strict, @@ -74,114 +80,166 @@ func NewPaymentControl(strict bool, db *channeldb.DB) ControlTower { // ClearForTakeoff checks that we don't already have an InFlight or Completed // payment identified by the same payment hash. func (p *paymentControl) ClearForTakeoff(htlc *lnwire.UpdateAddHTLC) error { - p.mx.Lock() - defer p.mx.Unlock() + var takeoffErr error + err := p.db.Batch(func(tx *bolt.Tx) error { + // Retrieve current status of payment from local database. + paymentStatus, err := channeldb.FetchPaymentStatusTx( + tx, htlc.PaymentHash, + ) + if err != nil { + return err + } - // Retrieve current status of payment from local database. - paymentStatus, err := p.db.FetchPaymentStatus(htlc.PaymentHash) + // Reset the takeoff error, to avoid carrying over an error + // from a previous execution of the batched db transaction. + takeoffErr = nil + + switch paymentStatus { + + case channeldb.StatusGrounded: + // It is safe to reattempt a payment if we know that we + // haven't left one in flight. Since this one is + // grounded, Transition the payment status to InFlight + // to prevent others. + return channeldb.UpdatePaymentStatusTx( + tx, htlc.PaymentHash, channeldb.StatusInFlight, + ) + + case channeldb.StatusInFlight: + // We already have an InFlight payment on the network. We will + // disallow any more payment until a response is received. + takeoffErr = ErrPaymentInFlight + + case channeldb.StatusCompleted: + // We've already completed a payment to this payment hash, + // forbid the switch from sending another. + takeoffErr = ErrAlreadyPaid + + default: + takeoffErr = ErrUnknownPaymentStatus + } + + return nil + }) if err != nil { return err } - switch paymentStatus { - - case channeldb.StatusGrounded: - // It is safe to reattempt a payment if we know that we haven't - // left one in flight. Since this one is grounded, Transition - // the payment status to InFlight to prevent others. - return p.db.UpdatePaymentStatus(htlc.PaymentHash, channeldb.StatusInFlight) - - case channeldb.StatusInFlight: - // We already have an InFlight payment on the network. We will - // disallow any more payment until a response is received. - return ErrPaymentInFlight - - case channeldb.StatusCompleted: - // We've already completed a payment to this payment hash, - // forbid the switch from sending another. - return ErrAlreadyPaid - - default: - return ErrUnknownPaymentStatus - } + return takeoffErr } // Success transitions an InFlight payment to Completed, otherwise it returns an // error. After calling Success, ClearForTakeoff should prevent any further // attempts for the same payment hash. func (p *paymentControl) Success(paymentHash [32]byte) error { - p.mx.Lock() - defer p.mx.Unlock() + var updateErr error + err := p.db.Batch(func(tx *bolt.Tx) error { + paymentStatus, err := channeldb.FetchPaymentStatusTx( + tx, paymentHash, + ) + if err != nil { + return err + } - paymentStatus, err := p.db.FetchPaymentStatus(paymentHash) + // Reset the update error, to avoid carrying over an error + // from a previous execution of the batched db transaction. + updateErr = nil + + switch { + + case paymentStatus == channeldb.StatusGrounded && p.strict: + // Our records show the payment as still being grounded, + // meaning it never should have left the switch. + updateErr = ErrPaymentNotInitiated + + case paymentStatus == channeldb.StatusGrounded && !p.strict: + // Though our records show the payment as still being + // grounded, meaning it never should have left the + // switch, we permit this transition in non-strict mode + // to handle inconsistent db states. + fallthrough + + case paymentStatus == channeldb.StatusInFlight: + // A successful response was received for an InFlight + // payment, mark it as completed to prevent sending to + // this payment hash again. + return channeldb.UpdatePaymentStatusTx( + tx, paymentHash, channeldb.StatusCompleted, + ) + + case paymentStatus == channeldb.StatusCompleted: + // The payment was completed previously, alert the + // caller that this may be a duplicate call. + updateErr = ErrPaymentAlreadyCompleted + + default: + updateErr = ErrUnknownPaymentStatus + } + + return nil + }) if err != nil { return err } - switch { - - case paymentStatus == channeldb.StatusGrounded && p.strict: - // Our records show the payment as still being grounded, meaning - // it never should have left the switch. - return ErrPaymentNotInitiated - - case paymentStatus == channeldb.StatusGrounded && !p.strict: - // Our records show the payment as still being grounded, meaning - // it never should have left the switch. - fallthrough - - case paymentStatus == channeldb.StatusInFlight: - // A successful response was received for an InFlight payment, - // mark it as completed to prevent sending to this payment hash - // again. - return p.db.UpdatePaymentStatus(paymentHash, channeldb.StatusCompleted) - - case paymentStatus == channeldb.StatusCompleted: - // The payment was completed previously, alert the caller that - // this may be a duplicate call. - return ErrPaymentAlreadyCompleted - - default: - return ErrUnknownPaymentStatus - } + return updateErr } // Fail transitions an InFlight payment to Grounded, otherwise it returns an // error. After calling Fail, ClearForTakeoff should fail any further attempts // for the same payment hash. func (p *paymentControl) Fail(paymentHash [32]byte) error { - p.mx.Lock() - defer p.mx.Unlock() + var updateErr error + err := p.db.Batch(func(tx *bolt.Tx) error { + paymentStatus, err := channeldb.FetchPaymentStatusTx( + tx, paymentHash, + ) + if err != nil { + return err + } - paymentStatus, err := p.db.FetchPaymentStatus(paymentHash) + // Reset the update error, to avoid carrying over an error + // from a previous execution of the batched db transaction. + updateErr = nil + + switch { + + case paymentStatus == channeldb.StatusGrounded && p.strict: + // Our records show the payment as still being grounded, + // meaning it never should have left the switch. + updateErr = ErrPaymentNotInitiated + + case paymentStatus == channeldb.StatusGrounded && !p.strict: + // Though our records show the payment as still being + // grounded, meaning it never should have left the + // switch, we permit this transition in non-strict mode + // to handle inconsistent db states. + fallthrough + + case paymentStatus == channeldb.StatusInFlight: + // A failed response was received for an InFlight + // payment, mark it as Grounded again to allow + // subsequent attempts. + return channeldb.UpdatePaymentStatusTx( + tx, paymentHash, channeldb.StatusGrounded, + ) + + case paymentStatus == channeldb.StatusCompleted: + // The payment was completed previously, and we are now + // reporting that it has failed. Leave the status as + // completed, but alert the user that something is + // wrong. + updateErr = ErrPaymentAlreadyCompleted + + default: + updateErr = ErrUnknownPaymentStatus + } + + return nil + }) if err != nil { return err } - switch { - - case paymentStatus == channeldb.StatusGrounded && p.strict: - // Our records show the payment as still being grounded, meaning - // it never should have left the switch. - return ErrPaymentNotInitiated - - case paymentStatus == channeldb.StatusGrounded && !p.strict: - // Our records show the payment as still being grounded, meaning - // it never should have left the switch. - fallthrough - - case paymentStatus == channeldb.StatusInFlight: - // A failed response was received for an InFlight payment, mark - // it as Grounded again to allow subsequent attempts. - return p.db.UpdatePaymentStatus(paymentHash, channeldb.StatusGrounded) - - case paymentStatus == channeldb.StatusCompleted: - // The payment was completed previously, and we are now - // reporting that it has failed. Leave the status as completed, - // but alert the user that something is wrong. - return ErrPaymentAlreadyCompleted - - default: - return ErrUnknownPaymentStatus - } + return updateErr } From b59fea460a26b9747cd25ae93a48d2b847a593b1 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 20 Aug 2018 21:21:15 -0700 Subject: [PATCH 30/32] htlcswitch/switch: make local response handling async This commit moves the logic handling responses to locally-initiated payments to be asynchronous. The reordering of operations into handleLocalDispatch brings a serious performance burden to the switch's main event loop. However, the at-most once semantics of circuit map and idempotency of cleanup methods allows concurrent operations to run in parallel. Prior to this commit, the async_payments_benchmark would timeout due to the forcibly serial nature of the prior design. With this change, there is no perceptible difference in the benchmark OMM, even though we've added two extra db calls. --- htlcswitch/switch.go | 56 ++++++++++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index c0cb310b..4df57cec 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -805,10 +805,30 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { return link.HandleSwitchPacket(pkt) } - // Otherwise this is a response to a payment that we initiated. We'll - // clean up any fwdpkg references, circuit entries, and mark in our db - // that the payment for this payment hash has either succeeded or - // failed. + s.wg.Add(1) + go s.handleLocalResponse(pkt) + + return nil +} + +// handleLocalResponse processes a Settle or Fail responding to a +// locally-initiated payment. This is handled asynchronously to avoid blocking +// the main event loop within the switch, as these operations can require +// multiple db transactions. The guarantees of the circuit map are stringent +// enough such that we are able to tolerate reordering of these operations +// without side effects. The primary operations handled are: +// 1. Ack settle/fail references, to avoid resending this response internally +// 2. Teardown the closing circuit in the circuit map +// 3. Transition the payment status to grounded or completed. +// 4. Respond to an in-mem pending payment, if it is found. +// +// NOTE: This method MUST be spawned as a goroutine. +func (s *Switch) handleLocalResponse(pkt *htlcPacket) { + defer s.wg.Done() + + // First, we'll clean up any fwdpkg references, circuit entries, and + // mark in our db that the payment for this payment hash has either + // succeeded or failed. // // If this response is contained in a forwarding package, we'll start by // acking the settle/fail so that we don't continue to retransmit the @@ -817,7 +837,7 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { if err := s.ackSettleFail(*pkt.destRef); err != nil { log.Warnf("Unable to ack settle/fail reference: %s: %v", *pkt.destRef, err) - return err + return } } @@ -831,7 +851,7 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { if err := s.teardownCircuit(pkt); err != nil { log.Warnf("Unable to teardown circuit %s: %v", pkt.inKey(), err) - return err + return } // Locate the pending payment to notify the application that this @@ -854,7 +874,9 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { // payment to this hash. err := s.control.Success(pkt.circuit.PaymentHash) if err != nil && err != ErrPaymentAlreadyCompleted { - return err + log.Warnf("Unable to mark completed payment %x: %v", + pkt.circuit.PaymentHash, err) + return } preimage = htlc.PaymentPreimage @@ -867,13 +889,16 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { // payment. err := s.control.Fail(pkt.circuit.PaymentHash) if err != nil && err != ErrPaymentAlreadyCompleted { - return err + log.Warnf("Unable to ground payment %x: %v", + pkt.circuit.PaymentHash, err) + return } paymentErr = s.parseFailedPayment(payment, pkt, htlc) default: - return errors.New("wrong update type") + log.Warnf("Received unknown response type: %T", pkt.htlc) + return } // Deliver the payment error and preimage to the application, if it is @@ -883,8 +908,6 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { payment.preimage <- preimage s.removePendingPayment(pkt.incomingHTLCID) } - - return nil } // parseFailedPayment determines the appropriate failure message to return to @@ -2078,17 +2101,11 @@ func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) { // removePendingPayment is the helper function which removes the pending user // payment. -func (s *Switch) removePendingPayment(paymentID uint64) error { +func (s *Switch) removePendingPayment(paymentID uint64) { s.pendingMutex.Lock() defer s.pendingMutex.Unlock() - if _, ok := s.pendingPayments[paymentID]; !ok { - return fmt.Errorf("Cannot find pending payment with ID %d", - paymentID) - } - delete(s.pendingPayments, paymentID) - return nil } // findPayment is the helper function which find the payment. @@ -2115,6 +2132,9 @@ func (s *Switch) CircuitModifier() CircuitModifier { // numPendingPayments is helper function which returns the overall number of // pending user payments. func (s *Switch) numPendingPayments() int { + s.pendingMutex.RLock() + defer s.pendingMutex.RUnlock() + return len(s.pendingPayments) } From 80814cf11ded55c05d74ce637b05ca0c9b540e1a Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 20 Aug 2018 21:47:34 -0700 Subject: [PATCH 31/32] lnd_test: satisfy linter alignment --- lnd_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lnd_test.go b/lnd_test.go index 9d6b45e7..6b935a9f 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -3843,7 +3843,7 @@ func testPrivateChannels(net *lntest.NetworkHarness, t *harnessTest) { } invoice := &lnrpc.Invoice{ - Memo: "testing", + Memo: "testing", RPreimage: preimage, Value: paymentAmt, } From 9c5c1d0cb5414db7115090fd8577ca3e7bd94486 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 21 Aug 2018 04:12:08 -0700 Subject: [PATCH 32/32] htlcswitch/switch: prevent panic for unknown error decryptor --- htlcswitch/switch.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 4df57cec..2e5248a4 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -46,6 +46,11 @@ var ( // txn. ErrIncompleteForward = errors.New("incomplete forward detected") + // ErrUnknownErrorDecryptor signals that we were unable to locate the + // error decryptor for this payment. This is likely due to restarting + // the daemon. + ErrUnknownErrorDecryptor = errors.New("unknown error decryptor") + // ErrSwitchExiting signaled when the switch has received a shutdown // request. ErrSwitchExiting = errors.New("htlcswitch shutting down") @@ -933,7 +938,7 @@ func (s *Switch) parseFailedPayment(payment *pendingPayment, pkt *htlcPacket, if err != nil { userErr = fmt.Sprintf("unable to decode onion failure, "+ "htlc with hash(%x): %v", - payment.paymentHash[:], err) + pkt.circuit.PaymentHash[:], err) log.Error(userErr) // As this didn't even clear the link, we don't need to @@ -960,6 +965,18 @@ func (s *Switch) parseFailedPayment(payment *pendingPayment, pkt *htlcPacket, FailureMessage: lnwire.FailPermanentChannelFailure{}, } + // If the provided payment is nil, we have discarded the error decryptor + // due to a restart. We'll return a fixed error and signal a temporary + // channel failure to the router. + case payment == nil: + userErr := fmt.Sprintf("error decryptor for payment " + + "could not be located, likely due to restart") + failure = &ForwardingError{ + ErrorSource: s.cfg.SelfKey, + ExtraMsg: userErr, + FailureMessage: lnwire.NewTemporaryChannelFailure(nil), + } + // A regular multi-hop payment error that we'll need to // decrypt. default: @@ -968,8 +985,9 @@ func (s *Switch) parseFailedPayment(payment *pendingPayment, pkt *htlcPacket, // error. If we're unable to then we'll bail early. failure, err = payment.deobfuscator.DecryptError(htlc.Reason) if err != nil { - userErr := fmt.Sprintf("unable to de-obfuscate onion failure, "+ - "htlc with hash(%x): %v", payment.paymentHash[:], err) + userErr := fmt.Sprintf("unable to de-obfuscate onion "+ + "failure, htlc with hash(%x): %v", + pkt.circuit.PaymentHash[:], err) log.Error(userErr) failure = &ForwardingError{ ErrorSource: s.cfg.SelfKey,