From c1eaf6000030b672be5e4d6ecffcd3dd8057d3b9 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 20 Dec 2018 11:57:44 +0100 Subject: [PATCH] invoices: create package This commit isolates the invoice registry in a separate package. It is a preparation for the creation of an invoices sub server. --- .../invoiceregistry.go | 91 ++++++++++--------- invoices/log.go | 45 +++++++++ log.go | 4 + rpcserver.go | 3 +- server.go | 9 +- witness_beacon.go | 3 +- 6 files changed, 107 insertions(+), 48 deletions(-) rename invoiceregistry.go => invoices/invoiceregistry.go (85%) create mode 100644 invoices/log.go diff --git a/invoiceregistry.go b/invoices/invoiceregistry.go similarity index 85% rename from invoiceregistry.go rename to invoices/invoiceregistry.go index d8a0fd5f..48123624 100644 --- a/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -1,9 +1,10 @@ -package main +package invoices import ( "bytes" "crypto/sha256" "fmt" + "github.com/btcsuite/btcd/chaincfg" "sync" "sync/atomic" "time" @@ -18,29 +19,30 @@ import ( ) var ( - // debugPre is the default debug preimage which is inserted into the + // DebugPre is the default debug preimage which is inserted into the // invoice registry if the --debughtlc flag is activated on start up. // All nodes initialized with the flag active will immediately settle // any incoming HTLC whose rHash corresponds with the debug // preimage. - debugPre, _ = chainhash.NewHash(bytes.Repeat([]byte{1}, 32)) + DebugPre, _ = chainhash.NewHash(bytes.Repeat([]byte{1}, 32)) - debugHash = chainhash.Hash(sha256.Sum256(debugPre[:])) + // DebugHash is the hash of the default preimage. + DebugHash = chainhash.Hash(sha256.Sum256(DebugPre[:])) ) -// invoiceRegistry is a central registry of all the outstanding invoices +// InvoiceRegistry is a central registry of all the outstanding invoices // created by the daemon. The registry is a thin wrapper around a map in order // to ensure that all updates/reads are thread safe. -type invoiceRegistry struct { +type InvoiceRegistry struct { sync.RWMutex cdb *channeldb.DB clientMtx sync.Mutex nextClientID uint32 - notificationClients map[uint32]*invoiceSubscription + notificationClients map[uint32]*InvoiceSubscription - newSubscriptions chan *invoiceSubscription + newSubscriptions chan *InvoiceSubscription subscriptionCancels chan uint32 invoiceEvents chan *invoiceEvent @@ -49,28 +51,33 @@ type invoiceRegistry struct { // that *all* nodes are able to fully settle. debugInvoices map[chainhash.Hash]*channeldb.Invoice + activeNetParams *chaincfg.Params + wg sync.WaitGroup quit chan struct{} } -// newInvoiceRegistry creates a new invoice registry. The invoice registry +// NewRegistry creates a new invoice registry. The invoice registry // wraps the persistent on-disk invoice storage with an additional in-memory // layer. The in-memory layer is in place such that debug invoices can be added // which are volatile yet available system wide within the daemon. -func newInvoiceRegistry(cdb *channeldb.DB) *invoiceRegistry { - return &invoiceRegistry{ +func NewRegistry(cdb *channeldb.DB, + activeNetParams *chaincfg.Params) *InvoiceRegistry { + + return &InvoiceRegistry{ cdb: cdb, debugInvoices: make(map[chainhash.Hash]*channeldb.Invoice), - notificationClients: make(map[uint32]*invoiceSubscription), - newSubscriptions: make(chan *invoiceSubscription), + notificationClients: make(map[uint32]*InvoiceSubscription), + newSubscriptions: make(chan *InvoiceSubscription), subscriptionCancels: make(chan uint32), invoiceEvents: make(chan *invoiceEvent, 100), + activeNetParams: activeNetParams, quit: make(chan struct{}), } } // Start starts the registry and all goroutines it needs to carry out its task. -func (i *invoiceRegistry) Start() error { +func (i *InvoiceRegistry) Start() error { i.wg.Add(1) go i.invoiceEventNotifier() @@ -79,7 +86,7 @@ func (i *invoiceRegistry) Start() error { } // Stop signals the registry for a graceful shutdown. -func (i *invoiceRegistry) Stop() { +func (i *InvoiceRegistry) Stop() { close(i.quit) i.wg.Wait() @@ -96,7 +103,7 @@ type invoiceEvent struct { // invoiceEventNotifier is the dedicated goroutine responsible for accepting // new notification subscriptions, cancelling old subscriptions, and // dispatching new invoice events. -func (i *invoiceRegistry) invoiceEventNotifier() { +func (i *InvoiceRegistry) invoiceEventNotifier() { defer i.wg.Done() for { @@ -110,11 +117,11 @@ func (i *invoiceRegistry) invoiceEventNotifier() { // invoice events. err := i.deliverBacklogEvents(newClient) if err != nil { - ltndLog.Errorf("unable to deliver backlog invoice "+ + log.Errorf("unable to deliver backlog invoice "+ "notifications: %v", err) } - ltndLog.Infof("New invoice subscription "+ + log.Infof("New invoice subscription "+ "client: id=%v", newClient.id) // With the backlog notifications delivered (if any), @@ -125,7 +132,7 @@ func (i *invoiceRegistry) invoiceEventNotifier() { // A client no longer wishes to receive invoice notifications. // So we'll remove them from the set of active clients. case clientID := <-i.subscriptionCancels: - ltndLog.Infof("Cancelling invoice subscription for "+ + log.Infof("Cancelling invoice subscription for "+ "client=%v", clientID) delete(i.notificationClients, clientID) @@ -157,14 +164,14 @@ func (i *invoiceRegistry) invoiceEventNotifier() { // instance. case event.state == channeldb.ContractOpen && client.addIndex+1 != invoice.AddIndex: - ltndLog.Warnf("client=%v for invoice "+ + log.Warnf("client=%v for invoice "+ "notifications missed an update, "+ "add_index=%v, new add event index=%v", clientID, client.addIndex, invoice.AddIndex) case event.state == channeldb.ContractSettled && client.settleIndex+1 != invoice.SettleIndex: - ltndLog.Warnf("client=%v for invoice "+ + log.Warnf("client=%v for invoice "+ "notifications missed an update, "+ "settle_index=%v, new settle event index=%v", clientID, client.settleIndex, @@ -192,7 +199,7 @@ func (i *invoiceRegistry) invoiceEventNotifier() { case channeldb.ContractOpen: client.addIndex = invoice.AddIndex default: - ltndLog.Errorf("unknown invoice "+ + log.Errorf("unknown invoice "+ "state: %v", event.state) } } @@ -205,7 +212,7 @@ func (i *invoiceRegistry) invoiceEventNotifier() { // deliverBacklogEvents will attempts to query the invoice database for any // notifications that the client has missed since it reconnected last. -func (i *invoiceRegistry) deliverBacklogEvents(client *invoiceSubscription) error { +func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) error { // First, we'll query the database to see if based on the provided // addIndex and settledIndex we need to deliver any backlog // notifications. @@ -257,7 +264,7 @@ func (i *invoiceRegistry) deliverBacklogEvents(client *invoiceSubscription) erro // by the passed preimage. Once this invoice is added, subsystems within the // daemon add/forward HTLCs that are able to obtain the proper preimage // required for redemption in the case that we're the final destination. -func (i *invoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash.Hash) { +func (i *InvoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash.Hash) { paymentHash := chainhash.Hash(sha256.Sum256(preimage[:])) invoice := &channeldb.Invoice{ @@ -272,7 +279,7 @@ func (i *invoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash i.debugInvoices[paymentHash] = invoice i.Unlock() - ltndLog.Debugf("Adding debug invoice %v", newLogClosure(func() string { + log.Debugf("Adding debug invoice %v", newLogClosure(func() string { return spew.Sdump(invoice) })) } @@ -284,11 +291,11 @@ func (i *invoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash // redemption in the case that we're the final destination. We also return the // addIndex of the newly created invoice which monotonically increases for each // new invoice added. -func (i *invoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error) { +func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error) { i.Lock() defer i.Unlock() - ltndLog.Debugf("Adding invoice %v", newLogClosure(func() string { + log.Debugf("Adding invoice %v", newLogClosure(func() string { return spew.Sdump(invoice) })) @@ -311,7 +318,7 @@ func (i *invoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error) // according to the cltv delta. // // TODO(roasbeef): ignore if settled? -func (i *invoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice, uint32, error) { +func (i *InvoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice, uint32, error) { // First check the in-memory debug invoice index to see if this is an // existing invoice added for debugging. i.RLock() @@ -331,7 +338,7 @@ func (i *invoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice } payReq, err := zpay32.Decode( - string(invoice.PaymentRequest), activeNetParams.Params, + string(invoice.PaymentRequest), i.activeNetParams, ) if err != nil { return channeldb.Invoice{}, 0, err @@ -343,13 +350,13 @@ func (i *invoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice // SettleInvoice attempts to mark an invoice as settled. If the invoice is a // debug invoice, then this method is a noop as debug invoices are never fully // settled. -func (i *invoiceRegistry) SettleInvoice(rHash chainhash.Hash, +func (i *InvoiceRegistry) SettleInvoice(rHash chainhash.Hash, amtPaid lnwire.MilliSatoshi) error { i.Lock() defer i.Unlock() - ltndLog.Debugf("Settling invoice %x", rHash[:]) + log.Debugf("Settling invoice %x", rHash[:]) // First check the in-memory debug invoice index to see if this is an // existing invoice added for debugging. @@ -366,7 +373,7 @@ func (i *invoiceRegistry) SettleInvoice(rHash chainhash.Hash, return err } - ltndLog.Infof("Payment received: %v", spew.Sdump(invoice)) + log.Infof("Payment received: %v", spew.Sdump(invoice)) i.notifyClients(invoice, channeldb.ContractSettled) @@ -375,7 +382,7 @@ func (i *invoiceRegistry) SettleInvoice(rHash chainhash.Hash, // notifyClients notifies all currently registered invoice notification clients // of a newly added/settled invoice. -func (i *invoiceRegistry) notifyClients(invoice *channeldb.Invoice, +func (i *InvoiceRegistry) notifyClients(invoice *channeldb.Invoice, state channeldb.ContractState) { event := &invoiceEvent{ @@ -389,12 +396,12 @@ func (i *invoiceRegistry) notifyClients(invoice *channeldb.Invoice, } } -// invoiceSubscription represents an intent to receive updates for newly added +// InvoiceSubscription represents an intent to receive updates for newly added // or settled invoices. For each newly added invoice, a copy of the invoice // will be sent over the NewInvoices channel. Similarly, for each newly settled // invoice, a copy of the invoice will be sent over the SettledInvoices // channel. -type invoiceSubscription struct { +type InvoiceSubscription struct { cancelled uint32 // To be used atomically. // NewInvoices is a channel that we'll use to send all newly created @@ -424,16 +431,16 @@ type invoiceSubscription struct { id uint32 - inv *invoiceRegistry + inv *InvoiceRegistry cancelChan chan struct{} wg sync.WaitGroup } -// Cancel unregisters the invoiceSubscription, freeing any previously allocated +// Cancel unregisters the InvoiceSubscription, freeing any previously allocated // resources. -func (i *invoiceSubscription) Cancel() { +func (i *InvoiceSubscription) Cancel() { if !atomic.CompareAndSwapUint32(&i.cancelled, 0, 1) { return } @@ -449,13 +456,13 @@ func (i *invoiceSubscription) Cancel() { i.wg.Wait() } -// SubscribeNotifications returns an invoiceSubscription which allows the +// SubscribeNotifications returns an InvoiceSubscription which allows the // caller to receive async notifications when any invoices are settled or // added. The invoiceIndex parameter is a streaming "checkpoint". We'll start // by first sending out all new events with an invoice index _greater_ than // this value. Afterwards, we'll send out real-time notifications. -func (i *invoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *invoiceSubscription { - client := &invoiceSubscription{ +func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *InvoiceSubscription { + client := &InvoiceSubscription{ NewInvoices: make(chan *channeldb.Invoice), SettledInvoices: make(chan *channeldb.Invoice), addIndex: addIndex, @@ -495,7 +502,7 @@ func (i *invoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) * case channeldb.ContractSettled: targetChan = client.SettledInvoices default: - ltndLog.Errorf("unknown invoice "+ + log.Errorf("unknown invoice "+ "state: %v", invoiceEvent.state) continue diff --git a/invoices/log.go b/invoices/log.go new file mode 100644 index 00000000..71e23c50 --- /dev/null +++ b/invoices/log.go @@ -0,0 +1,45 @@ +package invoices + +import ( + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/build" +) + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger("INVC", nil)) +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until UseLogger is called. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} + +// logClosure is used to provide a closure over expensive logging operations so +// don't have to be performed when the logging level doesn't warrant it. +type logClosure func() string + +// String invokes the underlying function and returns the result. +func (c logClosure) String() string { + return c() +} + +// newLogClosure returns a new closure over a function that returns a string +// which itself provides a Stringer interface so that it can be used with the +// logging system. +func newLogClosure(c func() string) logClosure { + return logClosure(c) +} diff --git a/log.go b/log.go index 21f647c0..bbde642e 100644 --- a/log.go +++ b/log.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/lightningnetwork/lnd/invoices" "io" "os" "path/filepath" @@ -71,6 +72,7 @@ var ( sgnrLog = build.NewSubLogger("SGNR", backendLog.Logger) wlktLog = build.NewSubLogger("WLKT", backendLog.Logger) arpcLog = build.NewSubLogger("ARPC", backendLog.Logger) + invcLog = build.NewSubLogger("INVC", backendLog.Logger) ) // Initialize package-global logger variables. @@ -91,6 +93,7 @@ func init() { signrpc.UseLogger(sgnrLog) walletrpc.UseLogger(wlktLog) autopilotrpc.UseLogger(arpcLog) + invoices.UseLogger(invcLog) } // subsystemLoggers maps each subsystem identifier to its associated logger. @@ -117,6 +120,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "SGNR": sgnrLog, "WLKT": wlktLog, "ARPC": arpcLog, + "INVC": invcLog, } // initLogRotator initializes the logging rotator to write logs to logFile and diff --git a/rpcserver.go b/rpcserver.go index ae6e992f..11274b7d 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -31,6 +31,7 @@ import ( "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" @@ -2608,7 +2609,7 @@ func extractPaymentIntent(rpcPayReq *rpcPaymentRequest) (rpcPaymentIntent, error // same debug rHash. Otherwise, we pay to the rHash specified within // the RPC request. case cfg.DebugHTLC && bytes.Equal(payIntent.rHash[:], zeroHash[:]): - copy(payIntent.rHash[:], debugHash[:]) + copy(payIntent.rHash[:], invoices.DebugHash[:]) default: copy(payIntent.rHash[:], rpcPayReq.PaymentHash) diff --git a/server.go b/server.go index b4693f3d..ab5f39bc 100644 --- a/server.go +++ b/server.go @@ -31,6 +31,7 @@ import ( "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnrpc" @@ -143,7 +144,7 @@ type server struct { htlcSwitch *htlcswitch.Switch - invoices *invoiceRegistry + invoices *invoices.InvoiceRegistry witnessBeacon contractcourt.WitnessBeacon @@ -266,7 +267,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, cc: cc, sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer), - invoices: newInvoiceRegistry(chanDB), + invoices: invoices.NewRegistry(chanDB, activeNetParams.Params), identityPriv: privKey, nodeSigner: newNodeSigner(privKey), @@ -306,9 +307,9 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, // HTLCs with the debug R-Hash immediately settled. if cfg.DebugHTLC { kiloCoin := btcutil.Amount(btcutil.SatoshiPerBitcoin * 1000) - s.invoices.AddDebugInvoice(kiloCoin, *debugPre) + s.invoices.AddDebugInvoice(kiloCoin, *invoices.DebugPre) srvrLog.Debugf("Debug HTLC invoice inserted, preimage=%x, hash=%x", - debugPre[:], debugHash[:]) + invoices.DebugPre[:], invoices.DebugHash[:]) } _, currentHeight, err := s.cc.chainIO.GetBestBlock() diff --git a/witness_beacon.go b/witness_beacon.go index a3b90bc6..04cc7466 100644 --- a/witness_beacon.go +++ b/witness_beacon.go @@ -6,6 +6,7 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" + "github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/lnwallet" ) @@ -23,7 +24,7 @@ type preimageSubscriber struct { type preimageBeacon struct { sync.RWMutex - invoices *invoiceRegistry + invoices *invoices.InvoiceRegistry wCache *channeldb.WitnessCache