From 18698663c5c353f647c5bb3be096f773066fbabe Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Tue, 15 Jan 2019 10:42:25 +0100 Subject: [PATCH 01/11] lnhash: create Hash and Preimage types This commit adds new hash and preimage types. These types are similar to chainhash.Hash, except for that string representations are not reversed. The reason for adding dedicated types and not use [32]byte, is to facilitate logging (%v displays as hex string) and have standard methods to convert from byte slice and string with a length check. --- lntypes/hash.go | 49 ++++++++++++++++++++++++++++++++++++++++ lntypes/preimage.go | 55 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 lntypes/hash.go create mode 100644 lntypes/preimage.go diff --git a/lntypes/hash.go b/lntypes/hash.go new file mode 100644 index 00000000..32214489 --- /dev/null +++ b/lntypes/hash.go @@ -0,0 +1,49 @@ +package lntypes + +import ( + "encoding/hex" + "fmt" +) + +// HashSize of array used to store hashes. +const HashSize = 32 + +// Hash is used in several of the lightning messages and common structures. It +// typically represents a payment hash. +type Hash [HashSize]byte + +// String returns the Hash as a hexadecimal string. +func (hash Hash) String() string { + return hex.EncodeToString(hash[:]) +} + +// NewHash returns a new Hash from a byte slice. An error is returned if +// the number of bytes passed in is not HashSize. +func NewHash(newHash []byte) (*Hash, error) { + nhlen := len(newHash) + if nhlen != HashSize { + return nil, fmt.Errorf("invalid hash length of %v, want %v", + nhlen, HashSize) + } + + var hash Hash + copy(hash[:], newHash) + + return &hash, nil +} + +// NewHashFromStr creates a Hash from a hex hash string. +func NewHashFromStr(newHash string) (*Hash, error) { + // Return error if hash string is of incorrect length. + if len(newHash) != HashSize*2 { + return nil, fmt.Errorf("invalid hash string length of %v, "+ + "want %v", len(newHash), HashSize*2) + } + + hash, err := hex.DecodeString(newHash) + if err != nil { + return nil, err + } + + return NewHash(hash) +} diff --git a/lntypes/preimage.go b/lntypes/preimage.go new file mode 100644 index 00000000..4c4289c7 --- /dev/null +++ b/lntypes/preimage.go @@ -0,0 +1,55 @@ +package lntypes + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" +) + +// PreimageSize of array used to store preimagees. +const PreimageSize = 32 + +// Preimage is used in several of the lightning messages and common structures. It +// represents a payment preimage. +type Preimage [PreimageSize]byte + +// String returns the Preimage as a hexadecimal string. +func (p Preimage) String() string { + return hex.EncodeToString(p[:]) +} + +// NewPreimage returns a new Preimage from a byte slice. An error is returned if +// the number of bytes passed in is not PreimageSize. +func NewPreimage(newPreimage []byte) (*Preimage, error) { + nhlen := len(newPreimage) + if nhlen != PreimageSize { + return nil, fmt.Errorf("invalid preimage length of %v, want %v", + nhlen, PreimageSize) + } + + var preimage Preimage + copy(preimage[:], newPreimage) + + return &preimage, nil +} + +// NewPreimageFromStr creates a Preimage from a hex preimage string. +func NewPreimageFromStr(newPreimage string) (*Preimage, error) { + // Return error if preimage string is of incorrect length. + if len(newPreimage) != PreimageSize*2 { + return nil, fmt.Errorf("invalid preimage string length of %v, "+ + "want %v", len(newPreimage), PreimageSize*2) + } + + preimage, err := hex.DecodeString(newPreimage) + if err != nil { + return nil, err + } + + return NewPreimage(preimage) +} + +// Hash returns the sha256 hash of the preimage. +func (p *Preimage) Hash() Hash { + return Hash(sha256.Sum256(p[:])) +} From bacd92418aecfda10cb1822969b9bf1313790236 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Tue, 15 Jan 2019 11:31:22 +0100 Subject: [PATCH 02/11] invoices: use lntypes.Hash and lntypes.Preimage Previously chainhash.Hash was used, which converts to/from string in reversed format. Payment hashes and preimages are supposed to be non-reversed. --- channeldb/invoices.go | 3 ++- contractcourt/chain_arbitrator.go | 3 ++- contractcourt/channel_arbitrator_test.go | 3 ++- htlcswitch/interfaces.go | 6 +++--- htlcswitch/link.go | 4 ++-- htlcswitch/mock.go | 14 +++++++------- htlcswitch/test_utils.go | 7 ++++--- invoices/invoiceregistry.go | 21 +++++++++++---------- witness_beacon.go | 4 ++-- 9 files changed, 35 insertions(+), 30 deletions(-) diff --git a/channeldb/invoices.go b/channeldb/invoices.go index 5915cb4c..d7396376 100644 --- a/channeldb/invoices.go +++ b/channeldb/invoices.go @@ -10,6 +10,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwire" ) @@ -105,7 +106,7 @@ type ContractTerm struct { // PaymentPreimage is the preimage which is to be revealed in the // occasion that an HTLC paying to the hash of this preimage is // extended. - PaymentPreimage [32]byte + PaymentPreimage lntypes.Preimage // Value is the expected amount of milli-satoshis to be paid to an HTLC // which can be satisfied by the above preimage. diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 556fba2d..18004b88 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -12,6 +12,7 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/input" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/sweep" @@ -139,7 +140,7 @@ type ChainArbitratorConfig struct { // SettleInvoice attempts to settle an existing invoice on-chain with // the given payment hash. ErrInvoiceNotFound is returned if an invoice // is not found. - SettleInvoice func(chainhash.Hash, lnwire.MilliSatoshi) error + SettleInvoice func(lntypes.Hash, lnwire.MilliSatoshi) error } // ChainArbitrator is a sub-system that oversees the on-chain resolution of all diff --git a/contractcourt/channel_arbitrator_test.go b/contractcourt/channel_arbitrator_test.go index 4518fac0..7fb14d5a 100644 --- a/contractcourt/channel_arbitrator_test.go +++ b/contractcourt/channel_arbitrator_test.go @@ -11,6 +11,7 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/input" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" ) @@ -176,7 +177,7 @@ func createTestChannelArbitrator(log ArbitratorLog) (*ChannelArbitrator, *lnwallet.IncomingHtlcResolution, uint32) error { return nil }, - SettleInvoice: func(chainhash.Hash, lnwire.MilliSatoshi) error { + SettleInvoice: func(lntypes.Hash, lnwire.MilliSatoshi) error { return nil }, } diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 3cb5f35c..8614dfaf 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -1,9 +1,9 @@ package htlcswitch import ( - "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwire" ) @@ -14,11 +14,11 @@ type InvoiceDatabase interface { // byte payment hash. This method should also reutrn the min final CLTV // delta for this invoice. We'll use this to ensure that the HTLC // extended to us gives us enough time to settle as we prescribe. - LookupInvoice(chainhash.Hash) (channeldb.Invoice, uint32, error) + LookupInvoice(lntypes.Hash) (channeldb.Invoice, uint32, error) // SettleInvoice attempts to mark an invoice corresponding to the // passed payment hash as fully settled. - SettleInvoice(payHash chainhash.Hash, paidAmount lnwire.MilliSatoshi) error + SettleInvoice(payHash lntypes.Hash, paidAmount lnwire.MilliSatoshi) error } // ChannelLink is an interface which represents the subsystem for managing the diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 18da280e..87b4ebe5 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -9,7 +9,6 @@ import ( "sync/atomic" "time" - "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/channeldb" @@ -17,6 +16,7 @@ import ( "github.com/lightningnetwork/lnd/htlcswitch/hodl" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/ticker" @@ -2312,7 +2312,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, // We're the designated payment destination. Therefore // we attempt to see if we have an invoice locally // which'll allow us to settle this htlc. - invoiceHash := chainhash.Hash(pd.RHash) + invoiceHash := lntypes.Hash(pd.RHash) invoice, minCltvDelta, err := l.cfg.Registry.LookupInvoice( invoiceHash, ) diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index b2473af1..c9b44d5c 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -17,7 +17,6 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/fastsha256" "github.com/go-errors/errors" "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/chainntnfs" @@ -25,6 +24,7 @@ import ( "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/ticker" @@ -686,18 +686,18 @@ var _ ChannelLink = (*mockChannelLink)(nil) type mockInvoiceRegistry struct { sync.Mutex - invoices map[chainhash.Hash]channeldb.Invoice + invoices map[lntypes.Hash]channeldb.Invoice finalDelta uint32 } func newMockRegistry(minDelta uint32) *mockInvoiceRegistry { return &mockInvoiceRegistry{ finalDelta: minDelta, - invoices: make(map[chainhash.Hash]channeldb.Invoice), + invoices: make(map[lntypes.Hash]channeldb.Invoice), } } -func (i *mockInvoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice, uint32, error) { +func (i *mockInvoiceRegistry) LookupInvoice(rHash lntypes.Hash) (channeldb.Invoice, uint32, error) { i.Lock() defer i.Unlock() @@ -710,7 +710,7 @@ func (i *mockInvoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Inv return invoice, i.finalDelta, nil } -func (i *mockInvoiceRegistry) SettleInvoice(rhash chainhash.Hash, +func (i *mockInvoiceRegistry) SettleInvoice(rhash lntypes.Hash, amt lnwire.MilliSatoshi) error { i.Lock() @@ -736,8 +736,8 @@ func (i *mockInvoiceRegistry) AddInvoice(invoice channeldb.Invoice) error { i.Lock() defer i.Unlock() - rhash := fastsha256.Sum256(invoice.Terms.PaymentPreimage[:]) - i.invoices[chainhash.Hash(rhash)] = invoice + rhash := invoice.Terms.PaymentPreimage.Hash() + i.invoices[rhash] = invoice return nil } diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index d7518de1..9677fac0 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -27,6 +27,7 @@ import ( "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/shachain" @@ -651,11 +652,11 @@ func generateHops(payAmt lnwire.MilliSatoshi, startingHeight uint32, } type paymentResponse struct { - rhash chainhash.Hash + rhash lntypes.Hash err chan error } -func (r *paymentResponse) Wait(d time.Duration) (chainhash.Hash, error) { +func (r *paymentResponse) Wait(d time.Duration) (lntypes.Hash, error) { select { case err := <-r.err: close(r.err) @@ -680,7 +681,7 @@ func (n *threeHopNetwork) makePayment(sendingPeer, receivingPeer lnpeer.Peer, paymentErr := make(chan error, 1) - var rhash chainhash.Hash + var rhash lntypes.Hash sender := sendingPeer.(*mockServer) receiver := receivingPeer.(*mockServer) diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index e330ccaa..a9bf44b9 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -2,17 +2,16 @@ package invoices import ( "bytes" - "crypto/sha256" "fmt" "sync" "sync/atomic" "time" "github.com/btcsuite/btcd/chaincfg" - "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/zpay32" @@ -24,10 +23,10 @@ var ( // All nodes initialized with the flag active will immediately settle // any incoming HTLC whose rHash corresponds with the debug // preimage. - DebugPre, _ = chainhash.NewHash(bytes.Repeat([]byte{1}, 32)) + DebugPre, _ = lntypes.NewPreimage(bytes.Repeat([]byte{1}, 32)) // DebugHash is the hash of the default preimage. - DebugHash = chainhash.Hash(sha256.Sum256(DebugPre[:])) + DebugHash = DebugPre.Hash() ) // InvoiceRegistry is a central registry of all the outstanding invoices @@ -49,7 +48,7 @@ type InvoiceRegistry struct { // debugInvoices is a map which stores special "debug" invoices which // should be only created/used when manual tests require an invoice // that *all* nodes are able to fully settle. - debugInvoices map[chainhash.Hash]*channeldb.Invoice + debugInvoices map[lntypes.Hash]*channeldb.Invoice activeNetParams *chaincfg.Params @@ -66,7 +65,7 @@ func NewRegistry(cdb *channeldb.DB, return &InvoiceRegistry{ cdb: cdb, - debugInvoices: make(map[chainhash.Hash]*channeldb.Invoice), + debugInvoices: make(map[lntypes.Hash]*channeldb.Invoice), notificationClients: make(map[uint32]*InvoiceSubscription), newSubscriptions: make(chan *InvoiceSubscription), subscriptionCancels: make(chan uint32), @@ -264,8 +263,10 @@ func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) erro // by the passed preimage. Once this invoice is added, subsystems within the // daemon add/forward HTLCs that are able to obtain the proper preimage // required for redemption in the case that we're the final destination. -func (i *InvoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash.Hash) { - paymentHash := chainhash.Hash(sha256.Sum256(preimage[:])) +func (i *InvoiceRegistry) AddDebugInvoice(amt btcutil.Amount, + preimage lntypes.Preimage) { + + paymentHash := preimage.Hash() invoice := &channeldb.Invoice{ CreationDate: time.Now(), @@ -318,7 +319,7 @@ func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error) // according to the cltv delta. // // TODO(roasbeef): ignore if settled? -func (i *InvoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice, uint32, error) { +func (i *InvoiceRegistry) LookupInvoice(rHash lntypes.Hash) (channeldb.Invoice, uint32, error) { // First check the in-memory debug invoice index to see if this is an // existing invoice added for debugging. i.RLock() @@ -350,7 +351,7 @@ func (i *InvoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice // SettleInvoice attempts to mark an invoice as settled. If the invoice is a // debug invoice, then this method is a noop as debug invoices are never fully // settled. -func (i *InvoiceRegistry) SettleInvoice(rHash chainhash.Hash, +func (i *InvoiceRegistry) SettleInvoice(rHash lntypes.Hash, amtPaid lnwire.MilliSatoshi) error { i.Lock() diff --git a/witness_beacon.go b/witness_beacon.go index 04cc7466..d7b92b01 100644 --- a/witness_beacon.go +++ b/witness_beacon.go @@ -3,10 +3,10 @@ package main import ( "sync" - "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/invoices" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet" ) @@ -72,7 +72,7 @@ func (p *preimageBeacon) LookupPreimage(payHash []byte) ([]byte, bool) { // First, we'll check the invoice registry to see if we already know of // the preimage as it's on that we created ourselves. - var invoiceKey chainhash.Hash + var invoiceKey lntypes.Hash copy(invoiceKey[:], payHash) invoice, _, err := p.invoices.LookupInvoice(invoiceKey) switch { From 3545685177cd8d194b24bdd756795ab784a24b6a Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 20 Dec 2018 11:42:28 +0100 Subject: [PATCH 03/11] invoicesrpc: create sub server Sub server implementation is still empty. This is a preparatory step for adding invoice functionality. --- lnd.go | 2 +- lnrpc/invoicesrpc/config_active.go | 16 +++++ lnrpc/invoicesrpc/config_default.go | 6 ++ lnrpc/invoicesrpc/driver.go | 55 +++++++++++++++++ lnrpc/invoicesrpc/invoices.pb.go | 77 ++++++++++++++++++++++++ lnrpc/invoicesrpc/invoices.proto | 11 ++++ lnrpc/invoicesrpc/invoices_server.go | 89 ++++++++++++++++++++++++++++ lnrpc/invoicesrpc/log.go | 45 ++++++++++++++ log.go | 4 ++ rpcserver.go | 3 +- subrpcserver_config.go | 16 ++++- 11 files changed, 321 insertions(+), 3 deletions(-) create mode 100644 lnrpc/invoicesrpc/config_active.go create mode 100644 lnrpc/invoicesrpc/config_default.go create mode 100644 lnrpc/invoicesrpc/driver.go create mode 100644 lnrpc/invoicesrpc/invoices.pb.go create mode 100644 lnrpc/invoicesrpc/invoices.proto create mode 100644 lnrpc/invoicesrpc/invoices_server.go create mode 100644 lnrpc/invoicesrpc/log.go diff --git a/lnd.go b/lnd.go index 46adb4a4..96e5b9d6 100644 --- a/lnd.go +++ b/lnd.go @@ -337,7 +337,7 @@ func lndMain() error { // exported by the rpcServer. rpcServer, err := newRPCServer( server, macaroonService, cfg.SubRPCServers, serverOpts, - proxyOpts, atplManager, tlsConf, + proxyOpts, atplManager, server.invoices, tlsConf, ) if err != nil { srvrLog.Errorf("unable to start RPC server: %v", err) diff --git a/lnrpc/invoicesrpc/config_active.go b/lnrpc/invoicesrpc/config_active.go new file mode 100644 index 00000000..4cdb8f4b --- /dev/null +++ b/lnrpc/invoicesrpc/config_active.go @@ -0,0 +1,16 @@ +// +build invoicesrpc + +package invoicesrpc + +import ( + "github.com/lightningnetwork/lnd/invoices" +) + +// Config is the primary configuration struct for the invoices RPC server. It +// contains all the items required for the rpc server to carry out its +// duties. The fields with struct tags are meant to be parsed as normal +// configuration options, while if able to be populated, the latter fields MUST +// also be specified. +type Config struct { + InvoiceRegistry *invoices.InvoiceRegistry +} diff --git a/lnrpc/invoicesrpc/config_default.go b/lnrpc/invoicesrpc/config_default.go new file mode 100644 index 00000000..bb40c480 --- /dev/null +++ b/lnrpc/invoicesrpc/config_default.go @@ -0,0 +1,6 @@ +// +build !invoicesrpc + +package invoicesrpc + +// Config is empty for non-invoicesrpc builds. +type Config struct{} diff --git a/lnrpc/invoicesrpc/driver.go b/lnrpc/invoicesrpc/driver.go new file mode 100644 index 00000000..54ad1fc1 --- /dev/null +++ b/lnrpc/invoicesrpc/driver.go @@ -0,0 +1,55 @@ +// +build invoicesrpc + +package invoicesrpc + +import ( + "fmt" + + "github.com/lightningnetwork/lnd/lnrpc" +) + +// createNewSubServer is a helper method that will create the new sub server +// given the main config dispatcher method. If we're unable to find the config +// that is meant for us in the config dispatcher, then we'll exit with an +// error. +func createNewSubServer(configRegistry lnrpc.SubServerConfigDispatcher) ( + lnrpc.SubServer, lnrpc.MacaroonPerms, error) { + + // We'll attempt to look up the config that we expect, according to our + // subServerName name. If we can't find this, then we'll exit with an + // error, as we're unable to properly initialize ourselves without this + // config. + subServerConf, ok := configRegistry.FetchConfig(subServerName) + if !ok { + return nil, nil, fmt.Errorf("unable to find config for "+ + "subserver type %s", subServerName) + } + + // Now that we've found an object mapping to our service name, we'll + // ensure that it's the type we need. + config, ok := subServerConf.(*Config) + if !ok { + return nil, nil, fmt.Errorf("wrong type of config for "+ + "subserver %s, expected %T got %T", subServerName, + &Config{}, subServerConf) + } + + return New(config) +} + +func init() { + subServer := &lnrpc.SubServerDriver{ + SubServerName: subServerName, + New: func(c lnrpc.SubServerConfigDispatcher) (lnrpc.SubServer, + lnrpc.MacaroonPerms, error) { + return createNewSubServer(c) + }, + } + + // If the build tag is active, then we'll register ourselves as a + // sub-RPC server within the global lnrpc package namespace. + if err := lnrpc.RegisterSubServer(subServer); err != nil { + panic(fmt.Sprintf("failed to register sub server driver "+ + "'%s': %v", subServerName, err)) + } +} diff --git a/lnrpc/invoicesrpc/invoices.pb.go b/lnrpc/invoicesrpc/invoices.pb.go new file mode 100644 index 00000000..44b4701e --- /dev/null +++ b/lnrpc/invoicesrpc/invoices.pb.go @@ -0,0 +1,77 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: invoicesrpc/invoices.proto + +package invoicesrpc // import "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc" + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// InvoicesClient is the client API for Invoices service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type InvoicesClient interface { +} + +type invoicesClient struct { + cc *grpc.ClientConn +} + +func NewInvoicesClient(cc *grpc.ClientConn) InvoicesClient { + return &invoicesClient{cc} +} + +// InvoicesServer is the server API for Invoices service. +type InvoicesServer interface { +} + +func RegisterInvoicesServer(s *grpc.Server, srv InvoicesServer) { + s.RegisterService(&_Invoices_serviceDesc, srv) +} + +var _Invoices_serviceDesc = grpc.ServiceDesc{ + ServiceName: "invoicesrpc.Invoices", + HandlerType: (*InvoicesServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{}, + Metadata: "invoicesrpc/invoices.proto", +} + +func init() { + proto.RegisterFile("invoicesrpc/invoices.proto", fileDescriptor_invoices_560fa62749d29606) +} + +var fileDescriptor_invoices_560fa62749d29606 = []byte{ + // 105 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0xca, 0xcc, 0x2b, 0xcb, + 0xcf, 0x4c, 0x4e, 0x2d, 0x2e, 0x2a, 0x48, 0xd6, 0x87, 0xb1, 0xf5, 0x0a, 0x8a, 0xf2, 0x4b, 0xf2, + 0x85, 0xb8, 0x91, 0xe4, 0x8c, 0xb8, 0xb8, 0x38, 0x3c, 0xa1, 0x5c, 0x27, 0xe3, 0x28, 0xc3, 0xf4, + 0xcc, 0x92, 0x8c, 0xd2, 0x24, 0xbd, 0xe4, 0xfc, 0x5c, 0xfd, 0x9c, 0xcc, 0xf4, 0x8c, 0x92, 0xbc, + 0xcc, 0xbc, 0xf4, 0xbc, 0xd4, 0x92, 0xf2, 0xfc, 0xa2, 0x6c, 0xfd, 0x9c, 0xbc, 0x14, 0xfd, 0x9c, + 0x3c, 0x64, 0x03, 0x8b, 0x0a, 0x92, 0x93, 0xd8, 0xc0, 0x86, 0x1a, 0x03, 0x02, 0x00, 0x00, 0xff, + 0xff, 0xaa, 0xe0, 0xa9, 0x52, 0x72, 0x00, 0x00, 0x00, +} diff --git a/lnrpc/invoicesrpc/invoices.proto b/lnrpc/invoicesrpc/invoices.proto new file mode 100644 index 00000000..f21ba1cc --- /dev/null +++ b/lnrpc/invoicesrpc/invoices.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +package invoicesrpc; + +option go_package = "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"; + +// Invoices is a service that can be used to create, accept, settle and cancel +// invoices. +service Invoices { +} + diff --git a/lnrpc/invoicesrpc/invoices_server.go b/lnrpc/invoicesrpc/invoices_server.go new file mode 100644 index 00000000..0cdb477a --- /dev/null +++ b/lnrpc/invoicesrpc/invoices_server.go @@ -0,0 +1,89 @@ +// +build invoicesrpc + +package invoicesrpc + +import ( + "github.com/lightningnetwork/lnd/lnrpc" + "google.golang.org/grpc" + "gopkg.in/macaroon-bakery.v2/bakery" +) + +const ( + // subServerName is the name of the sub rpc server. We'll use this name + // to register ourselves, and we also require that the main + // SubServerConfigDispatcher instance recognize it as the name of our + // RPC service. + subServerName = "InvoicesRPC" +) + +var ( + // macPermissions maps RPC calls to the permissions they require. + macPermissions = map[string][]bakery.Op{} +) + +// Server is a sub-server of the main RPC server: the invoices RPC. This sub +// RPC server allows external callers to access the status of the invoices +// currently active within lnd, as well as configuring it at runtime. +type Server struct { + started int32 // To be used atomically. + shutdown int32 // To be used atomically. + + cfg *Config +} + +// A compile time check to ensure that Server fully implements the +// InvoicesServer gRPC service. +var _ InvoicesServer = (*Server)(nil) + +// New returns a new instance of the invoicesrpc Invoices sub-server. We also +// return the set of permissions for the macaroons that we may create within +// this method. If the macaroons we need aren't found in the filepath, then +// we'll create them on start up. If we're unable to locate, or create the +// macaroons we need, then we'll return with an error. +func New(cfg *Config) (*Server, lnrpc.MacaroonPerms, error) { + // We don't create any new macaroons for this subserver, instead reuse + // existing onchain/offchain permissions. + server := &Server{ + cfg: cfg, + } + + return server, macPermissions, nil +} + +// Start launches any helper goroutines required for the Server to function. +// +// NOTE: This is part of the lnrpc.SubServer interface. +func (s *Server) Start() error { + return nil +} + +// Stop signals any active goroutines for a graceful closure. +// +// NOTE: This is part of the lnrpc.SubServer interface. +func (s *Server) Stop() error { + return nil +} + +// Name returns a unique string representation of the sub-server. This can be +// used to identify the sub-server and also de-duplicate them. +// +// NOTE: This is part of the lnrpc.SubServer interface. +func (s *Server) Name() string { + return subServerName +} + +// RegisterWithRootServer will be called by the root gRPC server to direct a sub +// RPC server to register itself with the main gRPC root server. Until this is +// called, each sub-server won't be able to have requests routed towards it. +// +// NOTE: This is part of the lnrpc.SubServer interface. +func (s *Server) RegisterWithRootServer(grpcServer *grpc.Server) error { + // We make sure that we register it with the main gRPC server to ensure + // all our methods are routed properly. + RegisterInvoicesServer(grpcServer, s) + + log.Debugf("Invoices RPC server successfully register with root " + + "gRPC server") + + return nil +} diff --git a/lnrpc/invoicesrpc/log.go b/lnrpc/invoicesrpc/log.go new file mode 100644 index 00000000..d29c7378 --- /dev/null +++ b/lnrpc/invoicesrpc/log.go @@ -0,0 +1,45 @@ +package invoicesrpc + +import ( + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/build" +) + +// log is a logger that is initialized with no output filters. This means the +// package will not perform any logging by default until the caller requests +// it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger("IRPC", nil)) +} + +// DisableLog disables all library log output. Logging output is disabled by +// by default until UseLogger is called. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. This +// should be used in preference to SetLogWriter if the caller is also using +// btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} + +// logClosure is used to provide a closure over expensive logging operations so +// don't have to be performed when the logging level doesn't warrant it. +type logClosure func() string + +// String invokes the underlying function and returns the result. +func (c logClosure) String() string { + return c() +} + +// newLogClosure returns a new closure over a function that returns a string +// which itself provides a Stringer interface so that it can be used with the +// logging system. +func newLogClosure(c func() string) logClosure { + return logClosure(c) +} diff --git a/log.go b/log.go index ed06ad69..b066485a 100644 --- a/log.go +++ b/log.go @@ -21,6 +21,7 @@ import ( "github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/lnrpc/autopilotrpc" "github.com/lightningnetwork/lnd/lnrpc/chainrpc" + "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc" "github.com/lightningnetwork/lnd/lnrpc/signrpc" "github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lnwallet" @@ -78,6 +79,7 @@ var ( nannLog = build.NewSubLogger("NANN", backendLog.Logger) wtwrLog = build.NewSubLogger("WTWR", backendLog.Logger) ntfrLog = build.NewSubLogger("NTFR", backendLog.Logger) + irpcLog = build.NewSubLogger("IRPC", backendLog.Logger) ) // Initialize package-global logger variables. @@ -102,6 +104,7 @@ func init() { netann.UseLogger(nannLog) watchtower.UseLogger(wtwrLog) chainrpc.UseLogger(ntfrLog) + invoicesrpc.UseLogger(irpcLog) } // subsystemLoggers maps each subsystem identifier to its associated logger. @@ -132,6 +135,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "NANN": nannLog, "WTWR": wtwrLog, "NTFR": ntfnLog, + "IRPC": irpcLog, } // initLogRotator initializes the logging rotator to write logs to logFile and diff --git a/rpcserver.go b/rpcserver.go index 5c8bfa85..813675c3 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -397,6 +397,7 @@ var _ lnrpc.LightningServer = (*rpcServer)(nil) func newRPCServer(s *server, macService *macaroons.Service, subServerCgs *subRPCServerConfigs, serverOpts []grpc.ServerOption, restServerOpts []grpc.DialOption, atpl *autopilot.Manager, + invoiceRegistry *invoices.InvoiceRegistry, tlsCfg *tls.Config) (*rpcServer, error) { var ( @@ -408,7 +409,7 @@ func newRPCServer(s *server, macService *macaroons.Service, // the dependencies they need are properly populated within each sub // server configuration struct. err := subServerCgs.PopulateDependencies( - s.cc, networkDir, macService, atpl, + s.cc, networkDir, macService, atpl, invoiceRegistry, ) if err != nil { return nil, err diff --git a/subrpcserver_config.go b/subrpcserver_config.go index 9d39732c..722cd788 100644 --- a/subrpcserver_config.go +++ b/subrpcserver_config.go @@ -5,8 +5,10 @@ import ( "reflect" "github.com/lightningnetwork/lnd/autopilot" + "github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/lnrpc/autopilotrpc" "github.com/lightningnetwork/lnd/lnrpc/chainrpc" + "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc" "github.com/lightningnetwork/lnd/lnrpc/signrpc" "github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/macaroons" @@ -37,6 +39,10 @@ type subRPCServerConfigs struct { // client to be notified of certain on-chain events (new blocks, // confirmations, spends). ChainRPC *chainrpc.Config `group:"chainrpc" namespace:"chainrpc"` + + // InvoicesRPC is a sub-RPC server that exposes invoice related methods + // as a gRPC service. + InvoicesRPC *invoicesrpc.Config `group:"invoicesrpc" namespace:"invoicesrpc"` } // PopulateDependencies attempts to iterate through all the sub-server configs @@ -47,7 +53,8 @@ type subRPCServerConfigs struct { // FetchConfig method. func (s *subRPCServerConfigs) PopulateDependencies(cc *chainControl, networkDir string, macService *macaroons.Service, - atpl *autopilot.Manager) error { + atpl *autopilot.Manager, + invoiceRegistry *invoices.InvoiceRegistry) error { // First, we'll use reflect to obtain a version of the config struct // that allows us to programmatically inspect its fields. @@ -125,6 +132,13 @@ func (s *subRPCServerConfigs) PopulateDependencies(cc *chainControl, reflect.ValueOf(cc.chainNotifier), ) + case *invoicesrpc.Config: + subCfgValue := extractReflectValue(cfg) + + subCfgValue.FieldByName("InvoiceRegistry").Set( + reflect.ValueOf(invoiceRegistry), + ) + default: return fmt.Errorf("unknown field: %v, %T", fieldName, cfg) From 78cd07570bafaf719b7084287094cab966467a35 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 19 Dec 2018 16:15:09 +0100 Subject: [PATCH 04/11] invoiceregistry: extract dispatch to method --- invoices/invoiceregistry.go | 134 +++++++++++++++++++----------------- 1 file changed, 71 insertions(+), 63 deletions(-) diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index a9bf44b9..c4e45fe2 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -139,69 +139,7 @@ func (i *InvoiceRegistry) invoiceEventNotifier() { // A sub-systems has just modified the invoice state, so we'll // dispatch notifications to all registered clients. case event := <-i.invoiceEvents: - for clientID, client := range i.notificationClients { - // Before we dispatch this event, we'll check - // to ensure that this client hasn't already - // received this notification in order to - // ensure we don't duplicate any events. - invoice := event.invoice - switch { - // If we've already sent this settle event to - // the client, then we can skip this. - case event.state == channeldb.ContractSettled && - client.settleIndex >= invoice.SettleIndex: - continue - - // Similarly, if we've already sent this add to - // the client then we can skip this one. - case event.state == channeldb.ContractOpen && - client.addIndex >= invoice.AddIndex: - continue - - // These two states should never happen, but we - // log them just in case so we can detect this - // instance. - case event.state == channeldb.ContractOpen && - client.addIndex+1 != invoice.AddIndex: - log.Warnf("client=%v for invoice "+ - "notifications missed an update, "+ - "add_index=%v, new add event index=%v", - clientID, client.addIndex, - invoice.AddIndex) - case event.state == channeldb.ContractSettled && - client.settleIndex+1 != invoice.SettleIndex: - log.Warnf("client=%v for invoice "+ - "notifications missed an update, "+ - "settle_index=%v, new settle event index=%v", - clientID, client.settleIndex, - invoice.SettleIndex) - } - - select { - case client.ntfnQueue.ChanIn() <- &invoiceEvent{ - state: event.state, - invoice: invoice, - }: - case <-i.quit: - return - } - - // Each time we send a notification to a - // client, we'll record the latest add/settle - // index it has. We'll use this to ensure we - // don't send a notification twice, which can - // happen if a new event is added while we're - // catching up a new client. - switch event.state { - case channeldb.ContractSettled: - client.settleIndex = invoice.SettleIndex - case channeldb.ContractOpen: - client.addIndex = invoice.AddIndex - default: - log.Errorf("unknown invoice "+ - "state: %v", event.state) - } - } + i.dispatchToClients(event) case <-i.quit: return @@ -209,6 +147,76 @@ func (i *InvoiceRegistry) invoiceEventNotifier() { } } +// dispatchToClients passes the supplied event to all notification clients that +// subscribed to all invoices. Add and settle indices are used to make sure that +// clients don't receive duplicate or unwanted events. +func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) { + invoice := event.invoice + + for clientID, client := range i.notificationClients { + // Before we dispatch this event, we'll check + // to ensure that this client hasn't already + // received this notification in order to + // ensure we don't duplicate any events. + + // TODO(joostjager): Refactor switches. + switch { + // If we've already sent this settle event to + // the client, then we can skip this. + case event.state == channeldb.ContractSettled && + client.settleIndex >= invoice.SettleIndex: + continue + + // Similarly, if we've already sent this add to + // the client then we can skip this one. + case event.state == channeldb.ContractOpen && + client.addIndex >= invoice.AddIndex: + continue + + // These two states should never happen, but we + // log them just in case so we can detect this + // instance. + case event.state == channeldb.ContractOpen && + client.addIndex+1 != invoice.AddIndex: + log.Warnf("client=%v for invoice "+ + "notifications missed an update, "+ + "add_index=%v, new add event index=%v", + clientID, client.addIndex, + invoice.AddIndex) + + case event.state == channeldb.ContractSettled && + client.settleIndex+1 != invoice.SettleIndex: + log.Warnf("client=%v for invoice "+ + "notifications missed an update, "+ + "settle_index=%v, new settle event index=%v", + clientID, client.settleIndex, + invoice.SettleIndex) + } + + select { + case client.ntfnQueue.ChanIn() <- &invoiceEvent{ + state: event.state, + invoice: invoice, + }: + case <-i.quit: + return + } + + // Each time we send a notification to a client, we'll record + // the latest add/settle index it has. We'll use this to ensure + // we don't send a notification twice, which can happen if a new + // event is added while we're catching up a new client. + switch event.state { + case channeldb.ContractSettled: + client.settleIndex = invoice.SettleIndex + case channeldb.ContractOpen: + client.addIndex = invoice.AddIndex + default: + log.Errorf("unknown invoice state: %v", event.state) + } + } +} + // deliverBacklogEvents will attempts to query the invoice database for any // notifications that the client has missed since it reconnected last. func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) error { From 2a4c93cdc4bfbe52449f7cb7a0fba475fd942256 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 20 Dec 2018 15:25:04 +0100 Subject: [PATCH 05/11] lnrpc: add search path to gen_protos.sh Without this addition, sub servers cannot import google annotations in their proto files. --- lnrpc/gen_protos.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/lnrpc/gen_protos.sh b/lnrpc/gen_protos.sh index acd5d813..c6bddea6 100755 --- a/lnrpc/gen_protos.sh +++ b/lnrpc/gen_protos.sh @@ -28,6 +28,7 @@ do echo "Generating protos from ${file}, into ${DIRECTORY}" protoc -I/usr/local/include -I.\ + -I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \ --go_out=plugins=grpc,paths=source_relative:. \ ${file} done From 436dd41c77fa17bb09098d8ce171108b1cd17a93 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 14 Jan 2019 12:03:26 +0100 Subject: [PATCH 06/11] channeldb: move idempotency up the call tree As a preparation for subscribing to single invoices, InvoiceRegistry needs to become aware of settling a settled invoice. --- channeldb/invoice_test.go | 6 +++--- channeldb/invoices.go | 22 +++++++++------------- invoices/invoiceregistry.go | 8 ++++++++ 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/channeldb/invoice_test.go b/channeldb/invoice_test.go index 68512607..3f35586b 100644 --- a/channeldb/invoice_test.go +++ b/channeldb/invoice_test.go @@ -359,10 +359,10 @@ func TestDuplicateSettleInvoice(t *testing.T) { } // If we try to settle the invoice again, then we should get the very - // same invoice back. + // same invoice back, but with an error this time. dbInvoice, err = db.SettleInvoice(payHash, amt) - if err != nil { - t.Fatalf("unable to settle invoice: %v", err) + if err != ErrInvoiceAlreadySettled { + t.Fatalf("expected ErrInvoiceAlreadySettled") } if dbInvoice == nil { diff --git a/channeldb/invoices.go b/channeldb/invoices.go index d7396376..c5223b17 100644 --- a/channeldb/invoices.go +++ b/channeldb/invoices.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/sha256" "encoding/binary" + "errors" "fmt" "io" "time" @@ -57,6 +58,10 @@ var ( // // settleIndexNo => invoiceKey settleIndexBucket = []byte("invoice-settle-index") + + // ErrInvoiceAlreadySettled is returned when the invoice is already + // settled. + ErrInvoiceAlreadySettled = errors.New("invoice already settled") ) const ( @@ -626,21 +631,14 @@ func (d *DB) SettleInvoice(paymentHash [32]byte, return ErrInvoiceNotFound } - invoice, err := settleInvoice( + settledInvoice, err = settleInvoice( invoices, settleIndex, invoiceNum, amtPaid, ) - if err != nil { - return err - } - settledInvoice = invoice - return nil + return err }) - if err != nil { - return nil, err - } - return settledInvoice, nil + return settledInvoice, err } // InvoicesSettledSince can be used by callers to catch up any settled invoices @@ -898,10 +896,8 @@ func settleInvoice(invoices, settleIndex *bbolt.Bucket, invoiceNum []byte, return nil, err } - // Add idempotency to duplicate settles, return here to avoid - // overwriting the previous info. if invoice.Terms.State == ContractSettled { - return &invoice, nil + return &invoice, ErrInvoiceAlreadySettled } // Now that we know the invoice hasn't already been settled, we'll diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index c4e45fe2..a34f7877 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -378,6 +378,14 @@ func (i *InvoiceRegistry) SettleInvoice(rHash lntypes.Hash, // If this isn't a debug invoice, then we'll attempt to settle an // invoice matching this rHash on disk (if one exists). invoice, err := i.cdb.SettleInvoice(rHash, amtPaid) + + // Implement idempotency by returning success if the invoice was already + // settled. + if err == channeldb.ErrInvoiceAlreadySettled { + log.Debugf("Invoice %v already settled", rHash) + return nil + } + if err != nil { return err } From acb016244359965ef72e0aecd12ba1c350359c6b Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 20 Dec 2018 18:54:11 +0100 Subject: [PATCH 07/11] invoices: subscribe single invoice --- invoices/invoiceregistry.go | 237 +++++++++++++++++++++++++++++++----- rpcserver.go | 2 +- 2 files changed, 205 insertions(+), 34 deletions(-) diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index a34f7877..16c28f2d 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -37,13 +37,15 @@ type InvoiceRegistry struct { cdb *channeldb.DB - clientMtx sync.Mutex - nextClientID uint32 - notificationClients map[uint32]*InvoiceSubscription + clientMtx sync.Mutex + nextClientID uint32 + notificationClients map[uint32]*InvoiceSubscription + singleNotificationClients map[uint32]*SingleInvoiceSubscription - newSubscriptions chan *InvoiceSubscription - subscriptionCancels chan uint32 - invoiceEvents chan *invoiceEvent + newSubscriptions chan *InvoiceSubscription + newSingleSubscriptions chan *SingleInvoiceSubscription + subscriptionCancels chan uint32 + invoiceEvents chan *invoiceEvent // debugInvoices is a map which stores special "debug" invoices which // should be only created/used when manual tests require an invoice @@ -64,14 +66,16 @@ func NewRegistry(cdb *channeldb.DB, activeNetParams *chaincfg.Params) *InvoiceRegistry { return &InvoiceRegistry{ - cdb: cdb, - debugInvoices: make(map[lntypes.Hash]*channeldb.Invoice), - notificationClients: make(map[uint32]*InvoiceSubscription), - newSubscriptions: make(chan *InvoiceSubscription), - subscriptionCancels: make(chan uint32), - invoiceEvents: make(chan *invoiceEvent, 100), - activeNetParams: activeNetParams, - quit: make(chan struct{}), + cdb: cdb, + debugInvoices: make(map[lntypes.Hash]*channeldb.Invoice), + notificationClients: make(map[uint32]*InvoiceSubscription), + singleNotificationClients: make(map[uint32]*SingleInvoiceSubscription), + newSubscriptions: make(chan *InvoiceSubscription), + newSingleSubscriptions: make(chan *SingleInvoiceSubscription), + subscriptionCancels: make(chan uint32), + invoiceEvents: make(chan *invoiceEvent, 100), + activeNetParams: activeNetParams, + quit: make(chan struct{}), } } @@ -96,6 +100,7 @@ func (i *InvoiceRegistry) Stop() { // instance where invoices are settled. type invoiceEvent struct { state channeldb.ContractState + hash lntypes.Hash invoice *channeldb.Invoice } @@ -107,9 +112,9 @@ func (i *InvoiceRegistry) invoiceEventNotifier() { for { select { - // A new invoice subscription has just arrived! We'll query for - // any backlog notifications, then add it to the set of - // clients. + // A new invoice subscription for all invoices has just arrived! + // We'll query for any backlog notifications, then add it to the + // set of clients. case newClient := <-i.newSubscriptions: // Before we add the client to our set of active // clients, we'll first attempt to deliver any backlog @@ -128,6 +133,23 @@ func (i *InvoiceRegistry) invoiceEventNotifier() { // continue. i.notificationClients[newClient.id] = newClient + // A new single invoice subscription has arrived. We'll query + // for any backlog notifications, then add it to the set of + // clients. + case newClient := <-i.newSingleSubscriptions: + err := i.deliverSingleBacklogEvents(newClient) + if err != nil { + log.Errorf("Unable to deliver backlog invoice "+ + "notifications: %v", err) + } + + log.Infof("New single invoice subscription "+ + "client: id=%v, hash=%v", + newClient.id, newClient.hash, + ) + + i.singleNotificationClients[newClient.id] = newClient + // A client no longer wishes to receive invoice notifications. // So we'll remove them from the set of active clients. case clientID := <-i.subscriptionCancels: @@ -135,11 +157,13 @@ func (i *InvoiceRegistry) invoiceEventNotifier() { "client=%v", clientID) delete(i.notificationClients, clientID) + delete(i.singleNotificationClients, clientID) // A sub-systems has just modified the invoice state, so we'll // dispatch notifications to all registered clients. case event := <-i.invoiceEvents: i.dispatchToClients(event) + i.dispatchToSingleClients(event) case <-i.quit: return @@ -147,6 +171,26 @@ func (i *InvoiceRegistry) invoiceEventNotifier() { } } +// dispatchToSingleClients passes the supplied event to all notification clients +// that subscribed to all the invoice this event applies to. +func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) { + // Dispatch to single invoice subscribers. + for _, client := range i.singleNotificationClients { + if client.hash != event.hash { + continue + } + + select { + case client.ntfnQueue.ChanIn() <- &invoiceEvent{ + state: event.state, + invoice: event.invoice, + }: + case <-i.quit: + return + } + } +} + // dispatchToClients passes the supplied event to all notification clients that // subscribed to all invoices. Add and settle indices are used to make sure that // clients don't receive duplicate or unwanted events. @@ -227,6 +271,7 @@ func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) erro if err != nil { return err } + settleEvents, err := i.cdb.InvoicesSettledSince(client.settleIndex) if err != nil { return err @@ -249,6 +294,7 @@ func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) erro return fmt.Errorf("registry shutting down") } } + for _, settleEvent := range settleEvents { // We re-bind the loop variable to ensure we don't hold onto // the loop reference causing is to point to the same item. @@ -267,6 +313,37 @@ func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) erro return nil } +// deliverSingleBacklogEvents will attempt to query the invoice database to +// retrieve the current invoice state and deliver this to the subscriber. Single +// invoice subscribers will always receive the current state right after +// subscribing. Only in case the invoice does not yet exist, nothing is sent +// yet. +func (i *InvoiceRegistry) deliverSingleBacklogEvents( + client *SingleInvoiceSubscription) error { + + invoice, err := i.cdb.LookupInvoice(client.hash) + + // It is possible that the invoice does not exist yet, but the client is + // already watching it in anticipation. + if err == channeldb.ErrInvoiceNotFound { + return nil + } + if err != nil { + return err + } + + err = client.notify(&invoiceEvent{ + hash: client.hash, + invoice: &invoice, + state: invoice.Terms.State, + }) + if err != nil { + return err + } + + return nil +} + // AddDebugInvoice adds a debug invoice for the specified amount, identified // by the passed preimage. Once this invoice is added, subsystems within the // daemon add/forward HTLCs that are able to obtain the proper preimage @@ -300,7 +377,9 @@ func (i *InvoiceRegistry) AddDebugInvoice(amt btcutil.Amount, // redemption in the case that we're the final destination. We also return the // addIndex of the newly created invoice which monotonically increases for each // new invoice added. -func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error) { +func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice, + paymentHash lntypes.Hash) (uint64, error) { + i.Lock() defer i.Unlock() @@ -315,7 +394,7 @@ func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error) // Now that we've added the invoice, we'll send dispatch a message to // notify the clients of this new invoice. - i.notifyClients(invoice, channeldb.ContractOpen) + i.notifyClients(paymentHash, invoice, channeldb.ContractOpen) return addIndex, nil } @@ -392,19 +471,21 @@ func (i *InvoiceRegistry) SettleInvoice(rHash lntypes.Hash, log.Infof("Payment received: %v", spew.Sdump(invoice)) - i.notifyClients(invoice, channeldb.ContractSettled) + i.notifyClients(rHash, invoice, channeldb.ContractSettled) return nil } // notifyClients notifies all currently registered invoice notification clients // of a newly added/settled invoice. -func (i *InvoiceRegistry) notifyClients(invoice *channeldb.Invoice, +func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash, + invoice *channeldb.Invoice, state channeldb.ContractState) { event := &invoiceEvent{ state: state, invoice: invoice, + hash: hash, } select { @@ -413,13 +494,25 @@ func (i *InvoiceRegistry) notifyClients(invoice *channeldb.Invoice, } } +// invoiceSubscriptionKit defines that are common to both all invoice +// subscribers and single invoice subscribers. +type invoiceSubscriptionKit struct { + id uint32 + inv *InvoiceRegistry + ntfnQueue *queue.ConcurrentQueue + + cancelled uint32 // To be used atomically. + cancelChan chan struct{} + wg sync.WaitGroup +} + // InvoiceSubscription represents an intent to receive updates for newly added // or settled invoices. For each newly added invoice, a copy of the invoice // will be sent over the NewInvoices channel. Similarly, for each newly settled // invoice, a copy of the invoice will be sent over the SettledInvoices // channel. type InvoiceSubscription struct { - cancelled uint32 // To be used atomically. + invoiceSubscriptionKit // NewInvoices is a channel that we'll use to send all newly created // invoices with an invoice index greater than the specified @@ -443,21 +536,23 @@ type InvoiceSubscription struct { // greater than this will be dispatched before any new notifications // are sent out. settleIndex uint64 +} - ntfnQueue *queue.ConcurrentQueue +// SingleInvoiceSubscription represents an intent to receive updates for a +// specific invoice. +type SingleInvoiceSubscription struct { + invoiceSubscriptionKit - id uint32 + hash lntypes.Hash - inv *InvoiceRegistry - - cancelChan chan struct{} - - wg sync.WaitGroup + // Updates is a channel that we'll use to send all invoice events for + // the invoice that is subscribed to. + Updates chan *channeldb.Invoice } // Cancel unregisters the InvoiceSubscription, freeing any previously allocated // resources. -func (i *InvoiceSubscription) Cancel() { +func (i *invoiceSubscriptionKit) Cancel() { if !atomic.CompareAndSwapUint32(&i.cancelled, 0, 1) { return } @@ -473,6 +568,16 @@ func (i *InvoiceSubscription) Cancel() { i.wg.Wait() } +func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error { + select { + case i.ntfnQueue.ChanIn() <- event: + case <-i.inv.quit: + return fmt.Errorf("registry shutting down") + } + + return nil +} + // SubscribeNotifications returns an InvoiceSubscription which allows the // caller to receive async notifications when any invoices are settled or // added. The invoiceIndex parameter is a streaming "checkpoint". We'll start @@ -484,9 +589,11 @@ func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) * SettledInvoices: make(chan *channeldb.Invoice), addIndex: addIndex, settleIndex: settleIndex, - inv: i, - ntfnQueue: queue.NewConcurrentQueue(20), - cancelChan: make(chan struct{}), + invoiceSubscriptionKit: invoiceSubscriptionKit{ + inv: i, + ntfnQueue: queue.NewConcurrentQueue(20), + cancelChan: make(chan struct{}), + }, } client.ntfnQueue.Start() @@ -551,3 +658,67 @@ func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) * return client } + +// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the +// caller to receive async notifications for a specific invoice. +func (i *InvoiceRegistry) SubscribeSingleInvoice( + hash lntypes.Hash) *SingleInvoiceSubscription { + + client := &SingleInvoiceSubscription{ + Updates: make(chan *channeldb.Invoice), + invoiceSubscriptionKit: invoiceSubscriptionKit{ + inv: i, + ntfnQueue: queue.NewConcurrentQueue(20), + cancelChan: make(chan struct{}), + }, + hash: hash, + } + client.ntfnQueue.Start() + + i.clientMtx.Lock() + client.id = i.nextClientID + i.nextClientID++ + i.clientMtx.Unlock() + + // Before we register this new invoice subscription, we'll launch a new + // goroutine that will proxy all notifications appended to the end of + // the concurrent queue to the two client-side channels the caller will + // feed off of. + i.wg.Add(1) + go func() { + defer i.wg.Done() + + for { + select { + // A new invoice event has been sent by the + // invoiceRegistry. We will dispatch the event to the + // client. + case ntfn := <-client.ntfnQueue.ChanOut(): + invoiceEvent := ntfn.(*invoiceEvent) + + select { + case client.Updates <- invoiceEvent.invoice: + + case <-client.cancelChan: + return + + case <-i.quit: + return + } + + case <-client.cancelChan: + return + + case <-i.quit: + return + } + } + }() + + select { + case i.newSingleSubscriptions <- client: + case <-i.quit: + } + + return client +} diff --git a/rpcserver.go b/rpcserver.go index 813675c3..934c54e4 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3320,7 +3320,7 @@ func (r *rpcServer) AddInvoice(ctx context.Context, ) // With all sanity checks passed, write the invoice to the database. - addIndex, err := r.server.invoices.AddInvoice(newInvoice) + addIndex, err := r.server.invoices.AddInvoice(newInvoice, rHash) if err != nil { return nil, err } From 4c4536a4883d40900d98633a1a25aecf699360bf Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 3 Jan 2019 19:13:08 +0100 Subject: [PATCH 08/11] lnrpc: move invoice marshall code to package As a preparation for reusing the marshall code in the invoices sub server. --- lnrpc/invoicesrpc/utils.go | 119 +++++++++++++++++++++++++++++++++++ rpcserver.go | 124 +++++-------------------------------- 2 files changed, 133 insertions(+), 110 deletions(-) create mode 100644 lnrpc/invoicesrpc/utils.go diff --git a/lnrpc/invoicesrpc/utils.go b/lnrpc/invoicesrpc/utils.go new file mode 100644 index 00000000..47ed217d --- /dev/null +++ b/lnrpc/invoicesrpc/utils.go @@ -0,0 +1,119 @@ +package invoicesrpc + +import ( + "encoding/hex" + "fmt" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/routing" + "github.com/lightningnetwork/lnd/zpay32" +) + +// CreateRPCInvoice creates an *lnrpc.Invoice from the *channeldb.Invoice. +func CreateRPCInvoice(invoice *channeldb.Invoice, + activeNetParams *chaincfg.Params) (*lnrpc.Invoice, error) { + + paymentRequest := string(invoice.PaymentRequest) + decoded, err := zpay32.Decode(paymentRequest, activeNetParams) + if err != nil { + return nil, fmt.Errorf("unable to decode payment request: %v", + err) + } + + var descHash []byte + if decoded.DescriptionHash != nil { + descHash = decoded.DescriptionHash[:] + } + + fallbackAddr := "" + if decoded.FallbackAddr != nil { + fallbackAddr = decoded.FallbackAddr.String() + } + + settleDate := int64(0) + if !invoice.SettleDate.IsZero() { + settleDate = invoice.SettleDate.Unix() + } + + // Expiry time will default to 3600 seconds if not specified + // explicitly. + expiry := int64(decoded.Expiry().Seconds()) + + // The expiry will default to 9 blocks if not specified explicitly. + cltvExpiry := decoded.MinFinalCLTVExpiry() + + // Convert between the `lnrpc` and `routing` types. + routeHints := CreateRPCRouteHints(decoded.RouteHints) + + preimage := invoice.Terms.PaymentPreimage + satAmt := invoice.Terms.Value.ToSatoshis() + satAmtPaid := invoice.AmtPaid.ToSatoshis() + + isSettled := invoice.Terms.State == channeldb.ContractSettled + + var state lnrpc.Invoice_InvoiceState + switch invoice.Terms.State { + case channeldb.ContractOpen: + state = lnrpc.Invoice_OPEN + case channeldb.ContractSettled: + state = lnrpc.Invoice_SETTLED + default: + return nil, fmt.Errorf("unknown invoice state") + } + + return &lnrpc.Invoice{ + Memo: string(invoice.Memo[:]), + Receipt: invoice.Receipt[:], + RHash: decoded.PaymentHash[:], + RPreimage: preimage[:], + Value: int64(satAmt), + CreationDate: invoice.CreationDate.Unix(), + SettleDate: settleDate, + Settled: isSettled, + PaymentRequest: paymentRequest, + DescriptionHash: descHash, + Expiry: expiry, + CltvExpiry: cltvExpiry, + FallbackAddr: fallbackAddr, + RouteHints: routeHints, + AddIndex: invoice.AddIndex, + Private: len(routeHints) > 0, + SettleIndex: invoice.SettleIndex, + AmtPaidSat: int64(satAmtPaid), + AmtPaidMsat: int64(invoice.AmtPaid), + AmtPaid: int64(invoice.AmtPaid), + State: state, + }, nil +} + +// CreateRPCRouteHints takes in the decoded form of an invoice's route hints +// and converts them into the lnrpc type. +func CreateRPCRouteHints(routeHints [][]routing.HopHint) []*lnrpc.RouteHint { + var res []*lnrpc.RouteHint + + for _, route := range routeHints { + hopHints := make([]*lnrpc.HopHint, 0, len(route)) + for _, hop := range route { + pubKey := hex.EncodeToString( + hop.NodeID.SerializeCompressed(), + ) + + hint := &lnrpc.HopHint{ + NodeId: pubKey, + ChanId: hop.ChannelID, + FeeBaseMsat: hop.FeeBaseMSat, + FeeProportionalMillionths: hop.FeeProportionalMillionths, + CltvExpiryDelta: uint32(hop.CLTVExpiryDelta), + } + + hopHints = append(hopHints, hint) + } + + routeHint := &lnrpc.RouteHint{HopHints: hopHints} + res = append(res, routeHint) + } + + return res +} diff --git a/rpcserver.go b/rpcserver.go index 934c54e4..e8151677 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -35,6 +35,7 @@ import ( "github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/macaroons" @@ -3332,111 +3333,6 @@ func (r *rpcServer) AddInvoice(ctx context.Context, }, nil } -// createRPCInvoice creates an *lnrpc.Invoice from the *channeldb.Invoice. -func createRPCInvoice(invoice *channeldb.Invoice) (*lnrpc.Invoice, error) { - paymentRequest := string(invoice.PaymentRequest) - decoded, err := zpay32.Decode(paymentRequest, activeNetParams.Params) - if err != nil { - return nil, fmt.Errorf("unable to decode payment request: %v", - err) - } - - descHash := []byte("") - if decoded.DescriptionHash != nil { - descHash = decoded.DescriptionHash[:] - } - - fallbackAddr := "" - if decoded.FallbackAddr != nil { - fallbackAddr = decoded.FallbackAddr.String() - } - - settleDate := int64(0) - if !invoice.SettleDate.IsZero() { - settleDate = invoice.SettleDate.Unix() - } - - // Expiry time will default to 3600 seconds if not specified - // explicitly. - expiry := int64(decoded.Expiry().Seconds()) - - // The expiry will default to 9 blocks if not specified explicitly. - cltvExpiry := decoded.MinFinalCLTVExpiry() - - // Convert between the `lnrpc` and `routing` types. - routeHints := createRPCRouteHints(decoded.RouteHints) - - preimage := invoice.Terms.PaymentPreimage - satAmt := invoice.Terms.Value.ToSatoshis() - satAmtPaid := invoice.AmtPaid.ToSatoshis() - - isSettled := invoice.Terms.State == channeldb.ContractSettled - - var state lnrpc.Invoice_InvoiceState - switch invoice.Terms.State { - case channeldb.ContractOpen: - state = lnrpc.Invoice_OPEN - case channeldb.ContractSettled: - state = lnrpc.Invoice_SETTLED - default: - return nil, fmt.Errorf("unknown invoice state") - } - - return &lnrpc.Invoice{ - Memo: string(invoice.Memo[:]), - Receipt: invoice.Receipt[:], - RHash: decoded.PaymentHash[:], - RPreimage: preimage[:], - Value: int64(satAmt), - CreationDate: invoice.CreationDate.Unix(), - SettleDate: settleDate, - Settled: isSettled, - PaymentRequest: paymentRequest, - DescriptionHash: descHash, - Expiry: expiry, - CltvExpiry: cltvExpiry, - FallbackAddr: fallbackAddr, - RouteHints: routeHints, - AddIndex: invoice.AddIndex, - Private: len(routeHints) > 0, - SettleIndex: invoice.SettleIndex, - AmtPaidSat: int64(satAmtPaid), - AmtPaidMsat: int64(invoice.AmtPaid), - AmtPaid: int64(invoice.AmtPaid), - State: state, - }, nil -} - -// createRPCRouteHints takes in the decoded form of an invoice's route hints -// and converts them into the lnrpc type. -func createRPCRouteHints(routeHints [][]routing.HopHint) []*lnrpc.RouteHint { - var res []*lnrpc.RouteHint - - for _, route := range routeHints { - hopHints := make([]*lnrpc.HopHint, 0, len(route)) - for _, hop := range route { - pubKey := hex.EncodeToString( - hop.NodeID.SerializeCompressed(), - ) - - hint := &lnrpc.HopHint{ - NodeId: pubKey, - ChanId: hop.ChannelID, - FeeBaseMsat: hop.FeeBaseMSat, - FeeProportionalMillionths: hop.FeeProportionalMillionths, - CltvExpiryDelta: uint32(hop.CLTVExpiryDelta), - } - - hopHints = append(hopHints, hint) - } - - routeHint := &lnrpc.RouteHint{HopHints: hopHints} - res = append(res, routeHint) - } - - return res -} - // LookupInvoice attempts to look up an invoice according to its payment hash. // The passed payment hash *must* be exactly 32 bytes, if not an error is // returned. @@ -3479,7 +3375,9 @@ func (r *rpcServer) LookupInvoice(ctx context.Context, return spew.Sdump(invoice) })) - rpcInvoice, err := createRPCInvoice(&invoice) + rpcInvoice, err := invoicesrpc.CreateRPCInvoice( + &invoice, activeNetParams.Params, + ) if err != nil { return nil, err } @@ -3519,7 +3417,9 @@ func (r *rpcServer) ListInvoices(ctx context.Context, LastIndexOffset: invoiceSlice.LastIndexOffset, } for i, invoice := range invoiceSlice.Invoices { - resp.Invoices[i], err = createRPCInvoice(&invoice) + resp.Invoices[i], err = invoicesrpc.CreateRPCInvoice( + &invoice, activeNetParams.Params, + ) if err != nil { return nil, err } @@ -3541,7 +3441,9 @@ func (r *rpcServer) SubscribeInvoices(req *lnrpc.InvoiceSubscription, for { select { case newInvoice := <-invoiceClient.NewInvoices: - rpcInvoice, err := createRPCInvoice(newInvoice) + rpcInvoice, err := invoicesrpc.CreateRPCInvoice( + newInvoice, activeNetParams.Params, + ) if err != nil { return err } @@ -3551,7 +3453,9 @@ func (r *rpcServer) SubscribeInvoices(req *lnrpc.InvoiceSubscription, } case settledInvoice := <-invoiceClient.SettledInvoices: - rpcInvoice, err := createRPCInvoice(settledInvoice) + rpcInvoice, err := invoicesrpc.CreateRPCInvoice( + settledInvoice, activeNetParams.Params, + ) if err != nil { return err } @@ -4455,7 +4359,7 @@ func (r *rpcServer) DecodePayReq(ctx context.Context, expiry := int64(payReq.Expiry().Seconds()) // Convert between the `lnrpc` and `routing` types. - routeHints := createRPCRouteHints(payReq.RouteHints) + routeHints := invoicesrpc.CreateRPCRouteHints(payReq.RouteHints) amt := int64(0) if payReq.MilliSat != nil { From 70c874be8810f83ddb26379d359fe198547f0f1f Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 3 Jan 2019 19:15:14 +0100 Subject: [PATCH 09/11] invoicesrpc: add SubscribeSingleInvoice rpc --- lnrpc/file_utils.go | 15 ++++ lnrpc/invoicesrpc/config_active.go | 16 ++++ lnrpc/invoicesrpc/invoices.pb.go | 100 ++++++++++++++++++++++--- lnrpc/invoicesrpc/invoices.proto | 9 +++ lnrpc/invoicesrpc/invoices_server.go | 106 +++++++++++++++++++++++++-- rpcserver.go | 1 + subrpcserver_config.go | 13 +++- 7 files changed, 241 insertions(+), 19 deletions(-) create mode 100644 lnrpc/file_utils.go diff --git a/lnrpc/file_utils.go b/lnrpc/file_utils.go new file mode 100644 index 00000000..ffb83fbf --- /dev/null +++ b/lnrpc/file_utils.go @@ -0,0 +1,15 @@ +package lnrpc + +import ( + "os" +) + +// FileExists reports whether the named file or directory exists. +func FileExists(name string) bool { + if _, err := os.Stat(name); err != nil { + if os.IsNotExist(err) { + return false + } + } + return true +} diff --git a/lnrpc/invoicesrpc/config_active.go b/lnrpc/invoicesrpc/config_active.go index 4cdb8f4b..39261d5e 100644 --- a/lnrpc/invoicesrpc/config_active.go +++ b/lnrpc/invoicesrpc/config_active.go @@ -3,7 +3,9 @@ package invoicesrpc import ( + "github.com/btcsuite/btcd/chaincfg" "github.com/lightningnetwork/lnd/invoices" + "github.com/lightningnetwork/lnd/macaroons" ) // Config is the primary configuration struct for the invoices RPC server. It @@ -12,5 +14,19 @@ import ( // configuration options, while if able to be populated, the latter fields MUST // also be specified. type Config struct { + // NetworkDir is the main network directory wherein the invoices rpc + // server will find the macaroon named DefaultInvoicesMacFilename. + NetworkDir string + + // MacService is the main macaroon service that we'll use to handle + // authentication for the invoices rpc server. + MacService *macaroons.Service + + // InvoiceRegistry is a central registry of all the outstanding invoices + // created by the daemon. InvoiceRegistry *invoices.InvoiceRegistry + + // ChainParams are required to properly decode invoice payment requests + // that are marshalled over rpc. + ChainParams *chaincfg.Params } diff --git a/lnrpc/invoicesrpc/invoices.pb.go b/lnrpc/invoicesrpc/invoices.pb.go index 44b4701e..c79c254c 100644 --- a/lnrpc/invoicesrpc/invoices.pb.go +++ b/lnrpc/invoicesrpc/invoices.pb.go @@ -6,6 +6,8 @@ package invoicesrpc // import "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc import proto "github.com/golang/protobuf/proto" import fmt "fmt" import math "math" +import lnrpc "github.com/lightningnetwork/lnd/lnrpc" +import _ "google.golang.org/genproto/googleapis/api/annotations" import ( context "golang.org/x/net/context" @@ -35,6 +37,11 @@ const _ = grpc.SupportPackageIsVersion4 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type InvoicesClient interface { + // * + // SubscribeSingleInvoice returns a uni-directional stream (server -> client) + // to notify the client of state transitions of the specified invoice. + // Initially the current invoice state is always sent out. + SubscribeSingleInvoice(ctx context.Context, in *lnrpc.PaymentHash, opts ...grpc.CallOption) (Invoices_SubscribeSingleInvoiceClient, error) } type invoicesClient struct { @@ -45,33 +52,102 @@ func NewInvoicesClient(cc *grpc.ClientConn) InvoicesClient { return &invoicesClient{cc} } +func (c *invoicesClient) SubscribeSingleInvoice(ctx context.Context, in *lnrpc.PaymentHash, opts ...grpc.CallOption) (Invoices_SubscribeSingleInvoiceClient, error) { + stream, err := c.cc.NewStream(ctx, &_Invoices_serviceDesc.Streams[0], "/invoicesrpc.Invoices/SubscribeSingleInvoice", opts...) + if err != nil { + return nil, err + } + x := &invoicesSubscribeSingleInvoiceClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Invoices_SubscribeSingleInvoiceClient interface { + Recv() (*lnrpc.Invoice, error) + grpc.ClientStream +} + +type invoicesSubscribeSingleInvoiceClient struct { + grpc.ClientStream +} + +func (x *invoicesSubscribeSingleInvoiceClient) Recv() (*lnrpc.Invoice, error) { + m := new(lnrpc.Invoice) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // InvoicesServer is the server API for Invoices service. type InvoicesServer interface { + // * + // SubscribeSingleInvoice returns a uni-directional stream (server -> client) + // to notify the client of state transitions of the specified invoice. + // Initially the current invoice state is always sent out. + SubscribeSingleInvoice(*lnrpc.PaymentHash, Invoices_SubscribeSingleInvoiceServer) error } func RegisterInvoicesServer(s *grpc.Server, srv InvoicesServer) { s.RegisterService(&_Invoices_serviceDesc, srv) } +func _Invoices_SubscribeSingleInvoice_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(lnrpc.PaymentHash) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(InvoicesServer).SubscribeSingleInvoice(m, &invoicesSubscribeSingleInvoiceServer{stream}) +} + +type Invoices_SubscribeSingleInvoiceServer interface { + Send(*lnrpc.Invoice) error + grpc.ServerStream +} + +type invoicesSubscribeSingleInvoiceServer struct { + grpc.ServerStream +} + +func (x *invoicesSubscribeSingleInvoiceServer) Send(m *lnrpc.Invoice) error { + return x.ServerStream.SendMsg(m) +} + var _Invoices_serviceDesc = grpc.ServiceDesc{ ServiceName: "invoicesrpc.Invoices", HandlerType: (*InvoicesServer)(nil), Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{}, - Metadata: "invoicesrpc/invoices.proto", + Streams: []grpc.StreamDesc{ + { + StreamName: "SubscribeSingleInvoice", + Handler: _Invoices_SubscribeSingleInvoice_Handler, + ServerStreams: true, + }, + }, + Metadata: "invoicesrpc/invoices.proto", } func init() { - proto.RegisterFile("invoicesrpc/invoices.proto", fileDescriptor_invoices_560fa62749d29606) + proto.RegisterFile("invoicesrpc/invoices.proto", fileDescriptor_invoices_c6414974947f2940) } -var fileDescriptor_invoices_560fa62749d29606 = []byte{ - // 105 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0xca, 0xcc, 0x2b, 0xcb, - 0xcf, 0x4c, 0x4e, 0x2d, 0x2e, 0x2a, 0x48, 0xd6, 0x87, 0xb1, 0xf5, 0x0a, 0x8a, 0xf2, 0x4b, 0xf2, - 0x85, 0xb8, 0x91, 0xe4, 0x8c, 0xb8, 0xb8, 0x38, 0x3c, 0xa1, 0x5c, 0x27, 0xe3, 0x28, 0xc3, 0xf4, - 0xcc, 0x92, 0x8c, 0xd2, 0x24, 0xbd, 0xe4, 0xfc, 0x5c, 0xfd, 0x9c, 0xcc, 0xf4, 0x8c, 0x92, 0xbc, - 0xcc, 0xbc, 0xf4, 0xbc, 0xd4, 0x92, 0xf2, 0xfc, 0xa2, 0x6c, 0xfd, 0x9c, 0xbc, 0x14, 0xfd, 0x9c, - 0x3c, 0x64, 0x03, 0x8b, 0x0a, 0x92, 0x93, 0xd8, 0xc0, 0x86, 0x1a, 0x03, 0x02, 0x00, 0x00, 0xff, - 0xff, 0xaa, 0xe0, 0xa9, 0x52, 0x72, 0x00, 0x00, 0x00, +var fileDescriptor_invoices_c6414974947f2940 = []byte{ + // 177 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x8e, 0xb1, 0x8e, 0xc2, 0x30, + 0x10, 0x44, 0x75, 0xcd, 0xe9, 0x2e, 0x27, 0x5d, 0xe1, 0x82, 0xc2, 0xe2, 0x1b, 0xb2, 0x40, 0x7a, + 0x0a, 0x2a, 0xa0, 0x42, 0x4a, 0x47, 0x67, 0x1b, 0xcb, 0x59, 0xe1, 0xec, 0x5a, 0xce, 0x06, 0xc4, + 0xdf, 0x23, 0x82, 0x91, 0xd2, 0x8d, 0x66, 0xe6, 0x49, 0xaf, 0xd2, 0x48, 0x37, 0x46, 0xe7, 0x87, + 0x9c, 0x1c, 0x7c, 0x72, 0x9d, 0x32, 0x0b, 0xab, 0xbf, 0xd9, 0xa6, 0x97, 0x81, 0x39, 0x44, 0x0f, + 0x26, 0x21, 0x18, 0x22, 0x16, 0x23, 0xc8, 0x54, 0xae, 0xfa, 0x37, 0x27, 0xf7, 0x8e, 0x9b, 0x63, + 0xf5, 0x73, 0x28, 0x9c, 0xda, 0x56, 0x8b, 0x76, 0xb4, 0x83, 0xcb, 0x68, 0x7d, 0x8b, 0x14, 0xa2, + 0x2f, 0x93, 0x52, 0x75, 0xa4, 0x17, 0x73, 0x32, 0x8f, 0xde, 0x93, 0xec, 0xcd, 0xd0, 0xe9, 0xff, + 0xd2, 0x95, 0xcf, 0xea, 0x6b, 0xd7, 0x9c, 0xd7, 0x01, 0xa5, 0x1b, 0x6d, 0xed, 0xb8, 0x87, 0x88, + 0xa1, 0x13, 0x42, 0x0a, 0xe4, 0xe5, 0xce, 0xf9, 0x0a, 0x91, 0x2e, 0x30, 0x21, 0x30, 0x33, 0xb5, + 0xdf, 0x93, 0x47, 0xf3, 0x0c, 0x00, 0x00, 0xff, 0xff, 0xcf, 0x4e, 0x97, 0xf2, 0xdb, 0x00, 0x00, + 0x00, } diff --git a/lnrpc/invoicesrpc/invoices.proto b/lnrpc/invoicesrpc/invoices.proto index f21ba1cc..e0e6a709 100644 --- a/lnrpc/invoicesrpc/invoices.proto +++ b/lnrpc/invoicesrpc/invoices.proto @@ -1,5 +1,8 @@ syntax = "proto3"; +import "google/api/annotations.proto"; +import "rpc.proto"; + package invoicesrpc; option go_package = "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"; @@ -7,5 +10,11 @@ option go_package = "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"; // Invoices is a service that can be used to create, accept, settle and cancel // invoices. service Invoices { + /** + SubscribeSingleInvoice returns a uni-directional stream (server -> client) + to notify the client of state transitions of the specified invoice. + Initially the current invoice state is always sent out. + */ + rpc SubscribeSingleInvoice (lnrpc.PaymentHash) returns (stream lnrpc.Invoice); } diff --git a/lnrpc/invoicesrpc/invoices_server.go b/lnrpc/invoicesrpc/invoices_server.go index 0cdb477a..fb328297 100644 --- a/lnrpc/invoicesrpc/invoices_server.go +++ b/lnrpc/invoicesrpc/invoices_server.go @@ -3,9 +3,15 @@ package invoicesrpc import ( - "github.com/lightningnetwork/lnd/lnrpc" + "context" "google.golang.org/grpc" "gopkg.in/macaroon-bakery.v2/bakery" + "io/ioutil" + "os" + "path/filepath" + + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lntypes" ) const ( @@ -17,8 +23,27 @@ const ( ) var ( + // macaroonOps are the set of capabilities that our minted macaroon (if + // it doesn't already exist) will have. + macaroonOps = []bakery.Op{ + { + Entity: "invoices", + Action: "read", + }, + } + // macPermissions maps RPC calls to the permissions they require. - macPermissions = map[string][]bakery.Op{} + macPermissions = map[string][]bakery.Op{ + "/invoicesrpc.Invoices/SubscribeSingleInvoice": {{ + Entity: "invoices", + Action: "read", + }}, + } + + // DefaultInvoicesMacFilename is the default name of the invoices + // macaroon that we expect to find via a file handle within the main + // configuration file in this package. + DefaultInvoicesMacFilename = "invoices.macaroon" ) // Server is a sub-server of the main RPC server: the invoices RPC. This sub @@ -28,6 +53,8 @@ type Server struct { started int32 // To be used atomically. shutdown int32 // To be used atomically. + quit chan struct{} + cfg *Config } @@ -41,10 +68,42 @@ var _ InvoicesServer = (*Server)(nil) // we'll create them on start up. If we're unable to locate, or create the // macaroons we need, then we'll return with an error. func New(cfg *Config) (*Server, lnrpc.MacaroonPerms, error) { - // We don't create any new macaroons for this subserver, instead reuse - // existing onchain/offchain permissions. + // If the path of the invoices macaroon wasn't specified, then we'll + // assume that it's found at the default network directory. + macFilePath := filepath.Join( + cfg.NetworkDir, DefaultInvoicesMacFilename, + ) + + // Now that we know the full path of the invoices macaroon, we can + // check to see if we need to create it or not. + if !lnrpc.FileExists(macFilePath) && cfg.MacService != nil { + log.Infof("Baking macaroons for invoices RPC Server at: %v", + macFilePath) + + // At this point, we know that the invoices macaroon doesn't + // yet, exist, so we need to create it with the help of the + // main macaroon service. + invoicesMac, err := cfg.MacService.Oven.NewMacaroon( + context.Background(), bakery.LatestVersion, nil, + macaroonOps..., + ) + if err != nil { + return nil, nil, err + } + invoicesMacBytes, err := invoicesMac.M().MarshalBinary() + if err != nil { + return nil, nil, err + } + err = ioutil.WriteFile(macFilePath, invoicesMacBytes, 0644) + if err != nil { + os.Remove(macFilePath) + return nil, nil, err + } + } + server := &Server{ - cfg: cfg, + cfg: cfg, + quit: make(chan struct{}, 1), } return server, macPermissions, nil @@ -61,6 +120,8 @@ func (s *Server) Start() error { // // NOTE: This is part of the lnrpc.SubServer interface. func (s *Server) Stop() error { + close(s.quit) + return nil } @@ -82,8 +143,41 @@ func (s *Server) RegisterWithRootServer(grpcServer *grpc.Server) error { // all our methods are routed properly. RegisterInvoicesServer(grpcServer, s) - log.Debugf("Invoices RPC server successfully register with root " + + log.Debugf("Invoices RPC server successfully registered with root " + "gRPC server") return nil } + +// SubscribeInvoices returns a uni-directional stream (server -> client) for +// notifying the client of invoice state changes. +func (s *Server) SubscribeSingleInvoice(req *lnrpc.PaymentHash, + updateStream Invoices_SubscribeSingleInvoiceServer) error { + + hash, err := lntypes.NewHash(req.RHash) + if err != nil { + return err + } + + invoiceClient := s.cfg.InvoiceRegistry.SubscribeSingleInvoice(*hash) + defer invoiceClient.Cancel() + + for { + select { + case newInvoice := <-invoiceClient.Updates: + rpcInvoice, err := CreateRPCInvoice( + newInvoice, s.cfg.ChainParams, + ) + if err != nil { + return err + } + + if err := updateStream.Send(rpcInvoice); err != nil { + return err + } + + case <-s.quit: + return nil + } + } +} diff --git a/rpcserver.go b/rpcserver.go index e8151677..bcf81663 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -411,6 +411,7 @@ func newRPCServer(s *server, macService *macaroons.Service, // server configuration struct. err := subServerCgs.PopulateDependencies( s.cc, networkDir, macService, atpl, invoiceRegistry, + activeNetParams.Params, ) if err != nil { return nil, err diff --git a/subrpcserver_config.go b/subrpcserver_config.go index 722cd788..cf7e8877 100644 --- a/subrpcserver_config.go +++ b/subrpcserver_config.go @@ -4,6 +4,7 @@ import ( "fmt" "reflect" + "github.com/btcsuite/btcd/chaincfg" "github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/lnrpc/autopilotrpc" @@ -54,7 +55,8 @@ type subRPCServerConfigs struct { func (s *subRPCServerConfigs) PopulateDependencies(cc *chainControl, networkDir string, macService *macaroons.Service, atpl *autopilot.Manager, - invoiceRegistry *invoices.InvoiceRegistry) error { + invoiceRegistry *invoices.InvoiceRegistry, + activeNetParams *chaincfg.Params) error { // First, we'll use reflect to obtain a version of the config struct // that allows us to programmatically inspect its fields. @@ -135,9 +137,18 @@ func (s *subRPCServerConfigs) PopulateDependencies(cc *chainControl, case *invoicesrpc.Config: subCfgValue := extractReflectValue(cfg) + subCfgValue.FieldByName("NetworkDir").Set( + reflect.ValueOf(networkDir), + ) + subCfgValue.FieldByName("MacService").Set( + reflect.ValueOf(macService), + ) subCfgValue.FieldByName("InvoiceRegistry").Set( reflect.ValueOf(invoiceRegistry), ) + subCfgValue.FieldByName("ChainParams").Set( + reflect.ValueOf(activeNetParams), + ) default: return fmt.Errorf("unknown field: %v, %T", fieldName, From 8996a1490d511b69041b2a88fb989d46b30278cc Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Fri, 11 Jan 2019 10:50:12 +0100 Subject: [PATCH 10/11] lnrpc: reuse common FileExists function --- lnrpc/autopilotrpc/autopilot_server.go | 10 ---------- lnrpc/signrpc/signer_server.go | 12 +----------- lnrpc/walletrpc/walletkit_server.go | 12 +----------- 3 files changed, 2 insertions(+), 32 deletions(-) diff --git a/lnrpc/autopilotrpc/autopilot_server.go b/lnrpc/autopilotrpc/autopilot_server.go index f274de71..7d43dff8 100644 --- a/lnrpc/autopilotrpc/autopilot_server.go +++ b/lnrpc/autopilotrpc/autopilot_server.go @@ -60,16 +60,6 @@ type Server struct { // AutopilotServer gRPC service. var _ AutopilotServer = (*Server)(nil) -// fileExists reports whether the named file or directory exists. -func fileExists(name string) bool { - if _, err := os.Stat(name); err != nil { - if os.IsNotExist(err) { - return false - } - } - return true -} - // New returns a new instance of the autopilotrpc Autopilot sub-server. We also // return the set of permissions for the macaroons that we may create within // this method. If the macaroons we need aren't found in the filepath, then diff --git a/lnrpc/signrpc/signer_server.go b/lnrpc/signrpc/signer_server.go index 04ada861..20abf43b 100644 --- a/lnrpc/signrpc/signer_server.go +++ b/lnrpc/signrpc/signer_server.go @@ -69,16 +69,6 @@ type Server struct { // gRPC service. var _ SignerServer = (*Server)(nil) -// fileExists reports whether the named file or directory exists. -func fileExists(name string) bool { - if _, err := os.Stat(name); err != nil { - if os.IsNotExist(err) { - return false - } - } - return true -} - // New returns a new instance of the signrpc Signer sub-server. We also return // the set of permissions for the macaroons that we may create within this // method. If the macaroons we need aren't found in the filepath, then we'll @@ -96,7 +86,7 @@ func New(cfg *Config) (*Server, lnrpc.MacaroonPerms, error) { // Now that we know the full path of the signer macaroon, we can check // to see if we need to create it or not. macFilePath := cfg.SignerMacPath - if cfg.MacService != nil && !fileExists(macFilePath) { + if cfg.MacService != nil && !lnrpc.FileExists(macFilePath) { log.Infof("Making macaroons for Signer RPC Server at: %v", macFilePath) diff --git a/lnrpc/walletrpc/walletkit_server.go b/lnrpc/walletrpc/walletkit_server.go index 77964ee8..1e2770b1 100644 --- a/lnrpc/walletrpc/walletkit_server.go +++ b/lnrpc/walletrpc/walletkit_server.go @@ -93,16 +93,6 @@ type WalletKit struct { // WalletKitServer gRPC service. var _ WalletKitServer = (*WalletKit)(nil) -// fileExists reports whether the named file or directory exists. -func fileExists(name string) bool { - if _, err := os.Stat(name); err != nil { - if os.IsNotExist(err) { - return false - } - } - return true -} - // New creates a new instance of the WalletKit sub-RPC server. func New(cfg *Config) (*WalletKit, lnrpc.MacaroonPerms, error) { // If the path of the wallet kit macaroon wasn't specified, then we'll @@ -116,7 +106,7 @@ func New(cfg *Config) (*WalletKit, lnrpc.MacaroonPerms, error) { // Now that we know the full path of the wallet kit macaroon, we can // check to see if we need to create it or not. macFilePath := cfg.WalletKitMacPath - if !fileExists(macFilePath) && cfg.MacService != nil { + if !lnrpc.FileExists(macFilePath) && cfg.MacService != nil { log.Infof("Baking macaroons for WalletKit RPC Server at: %v", macFilePath) From b16357116c3804f6ec2989f2a812ac8846e999cb Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 14 Jan 2019 10:09:46 +0100 Subject: [PATCH 11/11] invoices: add SubscribeSingleInvoice test --- invoices/invoiceregistry_test.go | 176 +++++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 invoices/invoiceregistry_test.go diff --git a/invoices/invoiceregistry_test.go b/invoices/invoiceregistry_test.go new file mode 100644 index 00000000..f261467a --- /dev/null +++ b/invoices/invoiceregistry_test.go @@ -0,0 +1,176 @@ +package invoices + +import ( + "io/ioutil" + "os" + "testing" + "time" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lntypes" + "github.com/lightningnetwork/lnd/lnwire" +) + +var ( + testTimeout = 5 * time.Second + + preimage = lntypes.Preimage{ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, + } + + hash = preimage.Hash() + + // testPayReq is a dummy payment request that does parse properly. It + // has no relation with the real invoice parameters and isn't asserted + // on in this test. LookupInvoice requires this to have a valid value. + testPayReq = "lnbc500u1pwywxzwpp5nd2u9xzq02t0tuf2654as7vma42lwkcjptx4yzfq0umq4swpa7cqdqqcqzysmlpc9ewnydr8rr8dnltyxphdyf6mcqrsd6dml8zajtyhwe6a45d807kxtmzayuf0hh2d9tn478ecxkecdg7c5g85pntupug5kakm7xcpn63zqk" +) + +// TestSettleInvoice tests settling of an invoice and related notifications. +func TestSettleInvoice(t *testing.T) { + cdb, cleanup, err := newDB() + if err != nil { + t.Fatal(err) + } + defer cleanup() + + // Instantiate and start the invoice registry. + registry := NewRegistry(cdb, &chaincfg.MainNetParams) + + err = registry.Start() + if err != nil { + t.Fatal(err) + } + defer registry.Stop() + + allSubscriptions := registry.SubscribeNotifications(0, 0) + defer allSubscriptions.Cancel() + + // Subscribe to the not yet existing invoice. + subscription := registry.SubscribeSingleInvoice(hash) + defer subscription.Cancel() + + if subscription.hash != hash { + t.Fatalf("expected subscription for provided hash") + } + + // Add the invoice. + invoice := &channeldb.Invoice{ + Terms: channeldb.ContractTerm{ + PaymentPreimage: preimage, + Value: lnwire.MilliSatoshi(100000), + }, + PaymentRequest: []byte(testPayReq), + } + + addIdx, err := registry.AddInvoice(invoice, hash) + if err != nil { + t.Fatal(err) + } + + if addIdx != 1 { + t.Fatalf("expected addIndex to start with 1, but got %v", + addIdx) + } + + // We expect the open state to be sent to the single invoice subscriber. + select { + case update := <-subscription.Updates: + if update.Terms.State != channeldb.ContractOpen { + t.Fatalf("expected state ContractOpen, but got %v", + update.Terms.State) + } + case <-time.After(testTimeout): + t.Fatal("no update received") + } + + // We expect a new invoice notification to be sent out. + select { + case newInvoice := <-allSubscriptions.NewInvoices: + if newInvoice.Terms.State != channeldb.ContractOpen { + t.Fatalf("expected state ContractOpen, but got %v", + newInvoice.Terms.State) + } + case <-time.After(testTimeout): + t.Fatal("no update received") + } + + // Settle invoice with a slightly higher amount. + amtPaid := lnwire.MilliSatoshi(100500) + err = registry.SettleInvoice(hash, amtPaid) + if err != nil { + t.Fatal(err) + } + + // We expect the settled state to be sent to the single invoice + // subscriber. + select { + case update := <-subscription.Updates: + if update.Terms.State != channeldb.ContractSettled { + t.Fatalf("expected state ContractOpen, but got %v", + update.Terms.State) + } + if update.AmtPaid != amtPaid { + t.Fatal("invoice AmtPaid incorrect") + } + case <-time.After(testTimeout): + t.Fatal("no update received") + } + + // We expect a settled notification to be sent out. + select { + case settledInvoice := <-allSubscriptions.SettledInvoices: + if settledInvoice.Terms.State != channeldb.ContractSettled { + t.Fatalf("expected state ContractOpen, but got %v", + settledInvoice.Terms.State) + } + case <-time.After(testTimeout): + t.Fatal("no update received") + } + + // Try to settle again. + err = registry.SettleInvoice(hash, amtPaid) + if err != nil { + t.Fatal("expected duplicate settle to succeed") + } + + // Try to settle again with a different amount. + err = registry.SettleInvoice(hash, amtPaid+600) + if err != nil { + t.Fatal("expected duplicate settle to succeed") + } + + // Check that settled amount remains unchanged. + inv, _, err := registry.LookupInvoice(hash) + if err != nil { + t.Fatal(err) + } + if inv.AmtPaid != amtPaid { + t.Fatal("expected amount to be unchanged") + } +} + +func newDB() (*channeldb.DB, func(), error) { + // First, create a temporary directory to be used for the duration of + // this test. + tempDirName, err := ioutil.TempDir("", "channeldb") + if err != nil { + return nil, nil, err + } + + // Next, create channeldb for the first time. + cdb, err := channeldb.Open(tempDirName) + if err != nil { + os.RemoveAll(tempDirName) + return nil, nil, err + } + + cleanUp := func() { + cdb.Close() + os.RemoveAll(tempDirName) + } + + return cdb, cleanUp, nil +}