diff --git a/chainregistry.go b/chainregistry.go index 3d750a3e..aa2df62c 100644 --- a/chainregistry.go +++ b/chainregistry.go @@ -161,7 +161,7 @@ type chainControl struct { // full-node, another backed by a running bitcoind full-node, and the other // backed by a running neutrino light client instance. When running with a // neutrino light client instance, `neutrinoCS` must be non-nil. -func newChainControlFromConfig(cfg *Config, chanDB *channeldb.DB, +func newChainControlFromConfig(cfg *Config, localDB, remoteDB *channeldb.DB, privateWalletPw, publicWalletPw []byte, birthday time.Time, recoveryWindow uint32, wallet *wallet.Wallet, neutrinoCS *neutrino.ChainService) (*chainControl, error) { @@ -225,8 +225,9 @@ func newChainControlFromConfig(cfg *Config, chanDB *channeldb.DB, if cfg.HeightHintCacheQueryDisable { ltndLog.Infof("Height Hint Cache Queries disabled") } + // Initialize the height hint cache within the chain directory. - hintCache, err := chainntnfs.NewHeightHintCache(heightHintCacheConfig, chanDB) + hintCache, err := chainntnfs.NewHeightHintCache(heightHintCacheConfig, localDB) if err != nil { return nil, fmt.Errorf("unable to initialize height hint "+ "cache: %v", err) @@ -523,7 +524,7 @@ func newChainControlFromConfig(cfg *Config, chanDB *channeldb.DB, // Create, and start the lnwallet, which handles the core payment // channel logic, and exposes control via proxy state machines. walletCfg := lnwallet.Config{ - Database: chanDB, + Database: remoteDB, Notifier: cc.chainNotifier, WalletController: wc, Signer: cc.signer, diff --git a/config.go b/config.go index 2e505855..4463c006 100644 --- a/config.go +++ b/config.go @@ -126,6 +126,8 @@ var ( // estimatesmartfee RPC call. defaultBitcoindEstimateMode = "CONSERVATIVE" bitcoindEstimateModes = [2]string{"ECONOMICAL", defaultBitcoindEstimateMode} + + defaultSphinxDbName = "sphinxreplay.db" ) // Config defines the configuration options for lnd. diff --git a/lnd.go b/lnd.go index bf121275..21a6923d 100644 --- a/lnd.go +++ b/lnd.go @@ -246,50 +246,20 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error { defer pprof.StopCPUProfile() } - // Create the network-segmented directory for the channel database. - ltndLog.Infof("Opening the main database, this might take a few " + - "minutes...") - - startOpenTime := time.Now() - ctx := context.Background() ctx, cancel := context.WithCancel(ctx) defer cancel() - if cfg.DB.Backend == lncfg.BoltBackend { - ltndLog.Infof("Opening bbolt database, sync_freelist=%v", - cfg.DB.Bolt.SyncFreelist) - } - - databaseBackends, err := cfg.DB.GetBackends( - ctx, cfg.localDatabaseDir(), cfg.networkName(), - ) - if err != nil { - ltndLog.Error(err) - return err - } - - // Open the channeldb, which is dedicated to storing channel, and - // network related metadata. - chanDB, err := channeldb.CreateWithBackend( - databaseBackends.LocalDB, - channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), - channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), - channeldb.OptionDryRunMigration(cfg.DryRunMigration), - ) + localChanDB, remoteChanDB, cleanUp, err := initializeDatabases(ctx, cfg) switch { case err == channeldb.ErrDryRunMigrationOK: ltndLog.Infof("%v, exiting", err) return nil - case err != nil: - ltndLog.Errorf("Unable to open channeldb: %v", err) - return err + return fmt.Errorf("unable to open databases: %v", err) } - defer chanDB.Close() - openTime := time.Since(startOpenTime) - ltndLog.Infof("Database now open (time_to_open=%v)!", openTime) + defer cleanUp() // Only process macaroons if --no-macaroons isn't set. tlsCfg, restCreds, restProxyDest, err := getTLSConfig(cfg) @@ -461,8 +431,12 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error { // With the information parsed from the configuration, create valid // instances of the pertinent interfaces required to operate the // Lightning Network Daemon. + // + // When we create the chain control, we need storage for the height + // hints and also the wallet itself, for these two we want them to be + // replicated, so we'll pass in the remote channel DB instance. activeChainControl, err := newChainControlFromConfig( - cfg, chanDB, privateWalletPw, publicWalletPw, + cfg, localChanDB, remoteChanDB, privateWalletPw, publicWalletPw, walletInitParams.Birthday, walletInitParams.RecoveryWindow, walletInitParams.Wallet, neutrinoCS, ) @@ -617,9 +591,9 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error { // Set up the core server which will listen for incoming peer // connections. server, err := newServer( - cfg, cfg.Listeners, chanDB, towerClientDB, activeChainControl, - &idKeyDesc, walletInitParams.ChansToRestore, chainedAcceptor, - torController, + cfg, cfg.Listeners, localChanDB, remoteChanDB, towerClientDB, + activeChainControl, &idKeyDesc, walletInitParams.ChansToRestore, + chainedAcceptor, torController, ) if err != nil { err := fmt.Errorf("unable to create server: %v", err) @@ -1140,3 +1114,125 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr, return nil, fmt.Errorf("shutting down") } } + +// initializeDatabases extracts the current databases that we'll use for normal +// operation in the daemon. Two databases are returned: one remote and one +// local. However, only if the replicated database is active will the remote +// database point to a unique database. Otherwise, the local and remote DB will +// both point to the same local database. A function closure that closes all +// opened databases is also returned. +func initializeDatabases(ctx context.Context, + cfg *Config) (*channeldb.DB, *channeldb.DB, func(), error) { + + ltndLog.Infof("Opening the main database, this might take a few " + + "minutes...") + + if cfg.DB.Backend == lncfg.BoltBackend { + ltndLog.Infof("Opening bbolt database, sync_freelist=%v", + cfg.DB.Bolt.SyncFreelist) + } + + startOpenTime := time.Now() + + databaseBackends, err := cfg.DB.GetBackends( + ctx, cfg.localDatabaseDir(), cfg.networkName(), + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to obtain database "+ + "backends: %v", err) + } + + // If the remoteDB is nil, then we'll just open a local DB as normal, + // having the remote and local pointer be the exact same instance. + var ( + localChanDB, remoteChanDB *channeldb.DB + closeFuncs []func() + ) + if databaseBackends.RemoteDB == nil { + // Open the channeldb, which is dedicated to storing channel, + // and network related metadata. + localChanDB, err = channeldb.CreateWithBackend( + databaseBackends.LocalDB, + channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), + channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), + channeldb.OptionDryRunMigration(cfg.DryRunMigration), + ) + switch { + case err == channeldb.ErrDryRunMigrationOK: + return nil, nil, nil, err + + case err != nil: + err := fmt.Errorf("unable to open local channeldb: %v", err) + ltndLog.Error(err) + return nil, nil, nil, err + } + + closeFuncs = append(closeFuncs, func() { + localChanDB.Close() + }) + + remoteChanDB = localChanDB + } else { + ltndLog.Infof("Database replication is available! Creating " + + "local and remote channeldb instances") + + // Otherwise, we'll open two instances, one for the state we + // only need locally, and the other for things we want to + // ensure are replicated. + localChanDB, err = channeldb.CreateWithBackend( + databaseBackends.LocalDB, + channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), + channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), + channeldb.OptionDryRunMigration(cfg.DryRunMigration), + ) + switch { + // As we want to allow both versions to get thru the dry run + // migration, we'll only exit the second time here once the + // remote instance has had a time to migrate as well. + case err == channeldb.ErrDryRunMigrationOK: + ltndLog.Infof("Local DB dry run migration successful") + + case err != nil: + err := fmt.Errorf("unable to open local channeldb: %v", err) + ltndLog.Error(err) + return nil, nil, nil, err + } + + closeFuncs = append(closeFuncs, func() { + localChanDB.Close() + }) + + ltndLog.Infof("Opening replicated database instance...") + + remoteChanDB, err = channeldb.CreateWithBackend( + databaseBackends.RemoteDB, + channeldb.OptionDryRunMigration(cfg.DryRunMigration), + ) + switch { + case err == channeldb.ErrDryRunMigrationOK: + return nil, nil, nil, err + + case err != nil: + localChanDB.Close() + + err := fmt.Errorf("unable to open remote channeldb: %v", err) + ltndLog.Error(err) + return nil, nil, nil, err + } + + closeFuncs = append(closeFuncs, func() { + remoteChanDB.Close() + }) + } + + openTime := time.Since(startOpenTime) + ltndLog.Infof("Database now open (time_to_open=%v)!", openTime) + + cleanUp := func() { + for _, closeFunc := range closeFuncs { + closeFunc() + } + } + + return localChanDB, remoteChanDB, cleanUp, nil +} diff --git a/peer/brontide.go b/peer/brontide.go index 948900f1..e1ecaf88 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -456,7 +456,7 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) ( // Before we register this new link with the HTLC Switch, we'll // need to fetch its current link-layer forwarding policy from // the database. - graph := p.cfg.ChannelDB.ChannelGraph() + graph := p.cfg.ChannelGraph info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint) if err != nil && err != channeldb.ErrEdgeNotFound { return nil, err diff --git a/peer/config.go b/peer/config.go index 8c0f9313..99b6e28c 100644 --- a/peer/config.go +++ b/peer/config.go @@ -90,10 +90,13 @@ type Config struct { // ChannelLinkConfig. InterceptSwitch *htlcswitch.InterceptableSwitch - // ChannelDB is used to fetch opened channels, closed channels, and the - // channel graph. + // ChannelDB is used to fetch opened channels, and closed channels. ChannelDB *channeldb.DB + // ChannelGraph is a pointer to the channel graph which is used to + // query information about the set of known active channels. + ChannelGraph *channeldb.ChannelGraph + // ChainArb is used to subscribe to channel events, update contract signals, // and force close channels. ChainArb *contractcourt.ChainArbitrator diff --git a/pilot.go b/pilot.go index 9ce83906..e0eb223e 100644 --- a/pilot.go +++ b/pilot.go @@ -185,7 +185,7 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot, WalletBalance: func() (btcutil.Amount, error) { return svr.cc.wallet.ConfirmedBalance(cfg.MinConfs) }, - Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()), + Graph: autopilot.ChannelGraphFromDatabase(svr.localChanDB.ChannelGraph()), Constraints: atplConstraints, ConnectToPeer: func(target *btcec.PublicKey, addrs []net.Addr) (bool, error) { // First, we'll check if we're already connected to the @@ -256,7 +256,7 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot, // We'll fetch the current state of open // channels from the database to use as initial // state for the auto-pilot agent. - activeChannels, err := svr.chanDB.FetchAllChannels() + activeChannels, err := svr.remoteChanDB.FetchAllChannels() if err != nil { return nil, err } diff --git a/rpcserver.go b/rpcserver.go index 4dfb5879..1cac4037 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -522,12 +522,12 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service, chanPredicate *chanacceptor.ChainedAcceptor) (*rpcServer, error) { // Set up router rpc backend. - channelGraph := s.chanDB.ChannelGraph() + channelGraph := s.localChanDB.ChannelGraph() selfNode, err := channelGraph.SourceNode() if err != nil { return nil, err } - graph := s.chanDB.ChannelGraph() + graph := s.localChanDB.ChannelGraph() routerBackend := &routerrpc.RouterBackend{ SelfNode: selfNode.PubKeyBytes, FetchChannelCapacity: func(chanID uint64) (btcutil.Amount, @@ -576,10 +576,12 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service, // Before we create any of the sub-servers, we need to ensure that all // the dependencies they need are properly populated within each sub // server configuration struct. + // + // TODO(roasbeef): extend sub-sever config to have both (local vs remote) DB err = subServerCgs.PopulateDependencies( cfg, s.cc, cfg.networkDir, macService, atpl, invoiceRegistry, s.htlcSwitch, activeNetParams.Params, s.chanRouter, - routerBackend, s.nodeSigner, s.chanDB, s.sweeper, tower, + routerBackend, s.nodeSigner, s.remoteChanDB, s.sweeper, tower, s.towerClient, cfg.net.ResolveTCPAddr, genInvoiceFeatures, rpcsLog, ) @@ -1399,7 +1401,7 @@ func (r *rpcServer) VerifyMessage(ctx context.Context, // channels signed the message. // // TODO(phlip9): Require valid nodes to have capital in active channels. - graph := r.server.chanDB.ChannelGraph() + graph := r.server.localChanDB.ChannelGraph() _, active, err := graph.HasLightningNode(pub) if err != nil { return nil, fmt.Errorf("failed to query graph: %v", err) @@ -1488,7 +1490,7 @@ func (r *rpcServer) DisconnectPeer(ctx context.Context, // Next, we'll fetch the pending/active channels we have with a // particular peer. - nodeChannels, err := r.server.chanDB.FetchOpenChannels(peerPubKey) + nodeChannels, err := r.server.remoteChanDB.FetchOpenChannels(peerPubKey) if err != nil { return nil, fmt.Errorf("unable to fetch channels for peer: %v", err) } @@ -2065,7 +2067,7 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, // First, we'll fetch the channel as is, as we'll need to examine it // regardless of if this is a force close or not. - channel, err := r.server.chanDB.FetchChannel(*chanPoint) + channel, err := r.server.remoteChanDB.FetchChannel(*chanPoint) if err != nil { return err } @@ -2329,7 +2331,7 @@ func (r *rpcServer) AbandonChannel(ctx context.Context, return nil, err } - dbChan, err := r.server.chanDB.FetchChannel(*chanPoint) + dbChan, err := r.server.remoteChanDB.FetchChannel(*chanPoint) switch { // If the channel isn't found in the set of open channels, then we can // continue on as it can't be loaded into the link/peer. @@ -2360,12 +2362,12 @@ func (r *rpcServer) AbandonChannel(ctx context.Context, // court. Between any step it's possible that the users restarts the // process all over again. As a result, each of the steps below are // intended to be idempotent. - err = r.server.chanDB.AbandonChannel(chanPoint, uint32(bestHeight)) + err = r.server.remoteChanDB.AbandonChannel(chanPoint, uint32(bestHeight)) if err != nil { return nil, err } err = abandonChanFromGraph( - r.server.chanDB.ChannelGraph(), chanPoint, + r.server.localChanDB.ChannelGraph(), chanPoint, ) if err != nil { return nil, err @@ -2399,7 +2401,7 @@ func (r *rpcServer) GetInfo(ctx context.Context, serverPeers := r.server.Peers() - openChannels, err := r.server.chanDB.FetchAllOpenChannels() + openChannels, err := r.server.remoteChanDB.FetchAllOpenChannels() if err != nil { return nil, err } @@ -2414,7 +2416,7 @@ func (r *rpcServer) GetInfo(ctx context.Context, inactiveChannels := uint32(len(openChannels)) - activeChannels - pendingChannels, err := r.server.chanDB.FetchPendingChannels() + pendingChannels, err := r.server.remoteChanDB.FetchPendingChannels() if err != nil { return nil, fmt.Errorf("unable to get retrieve pending "+ "channels: %v", err) @@ -2713,7 +2715,7 @@ func (r *rpcServer) WalletBalance(ctx context.Context, func (r *rpcServer) ChannelBalance(ctx context.Context, in *lnrpc.ChannelBalanceRequest) (*lnrpc.ChannelBalanceResponse, error) { - openChannels, err := r.server.chanDB.FetchAllOpenChannels() + openChannels, err := r.server.remoteChanDB.FetchAllOpenChannels() if err != nil { return nil, err } @@ -2723,7 +2725,7 @@ func (r *rpcServer) ChannelBalance(ctx context.Context, balance += channel.LocalCommitment.LocalBalance.ToSatoshis() } - pendingChannels, err := r.server.chanDB.FetchPendingChannels() + pendingChannels, err := r.server.remoteChanDB.FetchPendingChannels() if err != nil { return nil, err } @@ -2766,7 +2768,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context, // First, we'll populate the response with all the channels that are // soon to be opened. We can easily fetch this data from the database // and map the db struct to the proto response. - pendingOpenChannels, err := r.server.chanDB.FetchPendingChannels() + pendingOpenChannels, err := r.server.remoteChanDB.FetchPendingChannels() if err != nil { rpcsLog.Errorf("unable to fetch pending channels: %v", err) return nil, err @@ -2814,7 +2816,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context, // Next, we'll examine the channels that are soon to be closed so we // can populate these fields within the response. - pendingCloseChannels, err := r.server.chanDB.FetchClosedChannels(true) + pendingCloseChannels, err := r.server.remoteChanDB.FetchClosedChannels(true) if err != nil { rpcsLog.Errorf("unable to fetch closed channels: %v", err) return nil, err @@ -2843,7 +2845,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context, // not found, or the channel itself, this channel was closed // in a version before we started persisting historical // channels, so we silence the error. - historical, err := r.server.chanDB.FetchHistoricalChannel( + historical, err := r.server.remoteChanDB.FetchHistoricalChannel( &pendingClose.ChanPoint, ) switch err { @@ -2918,7 +2920,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context, // We'll also fetch all channels that are open, but have had their // commitment broadcasted, meaning they are waiting for the closing // transaction to confirm. - waitingCloseChans, err := r.server.chanDB.FetchWaitingCloseChannels() + waitingCloseChans, err := r.server.remoteChanDB.FetchWaitingCloseChannels() if err != nil { rpcsLog.Errorf("unable to fetch channels waiting close: %v", err) @@ -3153,7 +3155,7 @@ func (r *rpcServer) ClosedChannels(ctx context.Context, resp := &lnrpc.ClosedChannelsResponse{} - dbChannels, err := r.server.chanDB.FetchClosedChannels(false) + dbChannels, err := r.server.remoteChanDB.FetchClosedChannels(false) if err != nil { return nil, err } @@ -3230,9 +3232,9 @@ func (r *rpcServer) ListChannels(ctx context.Context, resp := &lnrpc.ListChannelsResponse{} - graph := r.server.chanDB.ChannelGraph() + graph := r.server.localChanDB.ChannelGraph() - dbChannels, err := r.server.chanDB.FetchAllOpenChannels() + dbChannels, err := r.server.remoteChanDB.FetchAllOpenChannels() if err != nil { return nil, err } @@ -3549,7 +3551,8 @@ func (r *rpcServer) createRPCClosedChannel( CloseInitiator: closeInitiator, } - reports, err := r.server.chanDB.FetchChannelReports( + reports, err := r.server.remoteChanDB.FetchChannelReports( + *activeNetParams.GenesisHash, &dbChannel.ChanPoint, ) switch err { @@ -3654,7 +3657,7 @@ func (r *rpcServer) getInitiators(chanPoint *wire.OutPoint) ( // To get the close initiator for cooperative closes, we need // to get the channel status from the historical channel bucket. - histChan, err := r.server.chanDB.FetchHistoricalChannel(chanPoint) + histChan, err := r.server.remoteChanDB.FetchHistoricalChannel(chanPoint) switch { // The node has upgraded from a version where we did not store // historical channels, and has not closed a channel since. Do @@ -3718,7 +3721,7 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription, // the server, or client exits. defer channelEventSub.Cancel() - graph := r.server.chanDB.ChannelGraph() + graph := r.server.localChanDB.ChannelGraph() for { select { @@ -4526,7 +4529,7 @@ func (r *rpcServer) AddInvoice(ctx context.Context, ChainParams: activeNetParams.Params, NodeSigner: r.server.nodeSigner, DefaultCLTVExpiry: defaultDelta, - ChanDB: r.server.chanDB, + ChanDB: r.server.remoteChanDB, GenInvoiceFeatures: func() *lnwire.FeatureVector { return r.server.featureMgr.Get(feature.SetInvoice) }, @@ -4640,7 +4643,7 @@ func (r *rpcServer) ListInvoices(ctx context.Context, PendingOnly: req.PendingOnly, Reversed: req.Reversed, } - invoiceSlice, err := r.server.chanDB.QueryInvoices(q) + invoiceSlice, err := r.server.remoteChanDB.QueryInvoices(q) if err != nil { return nil, fmt.Errorf("unable to query invoices: %v", err) } @@ -4806,7 +4809,7 @@ func (r *rpcServer) DescribeGraph(ctx context.Context, // Obtain the pointer to the global singleton channel graph, this will // provide a consistent view of the graph due to bolt db's // transactional model. - graph := r.server.chanDB.ChannelGraph() + graph := r.server.localChanDB.ChannelGraph() // First iterate through all the known nodes (connected or unconnected // within the graph), collating their current state into the RPC @@ -4944,7 +4947,7 @@ func (r *rpcServer) GetNodeMetrics(ctx context.Context, // Obtain the pointer to the global singleton channel graph, this will // provide a consistent view of the graph due to bolt db's // transactional model. - graph := r.server.chanDB.ChannelGraph() + graph := r.server.localChanDB.ChannelGraph() // Calculate betweenness centrality if requested. Note that depending on the // graph size, this may take up to a few minutes. @@ -4983,7 +4986,7 @@ func (r *rpcServer) GetNodeMetrics(ctx context.Context, func (r *rpcServer) GetChanInfo(ctx context.Context, in *lnrpc.ChanInfoRequest) (*lnrpc.ChannelEdge, error) { - graph := r.server.chanDB.ChannelGraph() + graph := r.server.localChanDB.ChannelGraph() edgeInfo, edge1, edge2, err := graph.FetchChannelEdgesByID(in.ChanId) if err != nil { @@ -5003,7 +5006,7 @@ func (r *rpcServer) GetChanInfo(ctx context.Context, func (r *rpcServer) GetNodeInfo(ctx context.Context, in *lnrpc.NodeInfoRequest) (*lnrpc.NodeInfo, error) { - graph := r.server.chanDB.ChannelGraph() + graph := r.server.localChanDB.ChannelGraph() // First, parse the hex-encoded public key into a full in-memory public // key object we can work with for querying. @@ -5101,7 +5104,7 @@ func (r *rpcServer) QueryRoutes(ctx context.Context, func (r *rpcServer) GetNetworkInfo(ctx context.Context, _ *lnrpc.NetworkInfoRequest) (*lnrpc.NetworkInfo, error) { - graph := r.server.chanDB.ChannelGraph() + graph := r.server.localChanDB.ChannelGraph() var ( numNodes uint32 @@ -5381,7 +5384,7 @@ func (r *rpcServer) ListPayments(ctx context.Context, query.MaxPayments = math.MaxUint64 } - paymentsQuerySlice, err := r.server.chanDB.QueryPayments(query) + paymentsQuerySlice, err := r.server.remoteChanDB.QueryPayments(query) if err != nil { return nil, err } @@ -5413,7 +5416,7 @@ func (r *rpcServer) DeleteAllPayments(ctx context.Context, rpcsLog.Debugf("[DeleteAllPayments]") - if err := r.server.chanDB.DeletePayments(); err != nil { + if err := r.server.remoteChanDB.DeletePayments(); err != nil { return nil, err } @@ -5533,7 +5536,7 @@ func (r *rpcServer) FeeReport(ctx context.Context, rpcsLog.Debugf("[feereport]") - channelGraph := r.server.chanDB.ChannelGraph() + channelGraph := r.server.localChanDB.ChannelGraph() selfNode, err := channelGraph.SourceNode() if err != nil { return nil, err @@ -5572,7 +5575,7 @@ func (r *rpcServer) FeeReport(ctx context.Context, return nil, err } - fwdEventLog := r.server.chanDB.ForwardingLog() + fwdEventLog := r.server.remoteChanDB.ForwardingLog() // computeFeeSum is a helper function that computes the total fees for // a particular time slice described by a forwarding event query. @@ -5810,7 +5813,7 @@ func (r *rpcServer) ForwardingHistory(ctx context.Context, IndexOffset: req.IndexOffset, NumMaxEvents: numEvents, } - timeSlice, err := r.server.chanDB.ForwardingLog().Query(eventQuery) + timeSlice, err := r.server.remoteChanDB.ForwardingLog().Query(eventQuery) if err != nil { return nil, fmt.Errorf("unable to query forwarding log: %v", err) } @@ -5871,7 +5874,7 @@ func (r *rpcServer) ExportChannelBackup(ctx context.Context, // the database. If this channel has been closed, or the outpoint is // unknown, then we'll return an error unpackedBackup, err := chanbackup.FetchBackupForChan( - chanPoint, r.server.chanDB, + chanPoint, r.server.remoteChanDB, ) if err != nil { return nil, err @@ -6041,7 +6044,7 @@ func (r *rpcServer) ExportAllChannelBackups(ctx context.Context, // First, we'll attempt to read back ups for ALL currently opened // channels from disk. allUnpackedBackups, err := chanbackup.FetchStaticChanBackups( - r.server.chanDB, + r.server.remoteChanDB, ) if err != nil { return nil, fmt.Errorf("unable to fetch all static chan "+ @@ -6064,7 +6067,7 @@ func (r *rpcServer) RestoreChannelBackups(ctx context.Context, // restore either a set of chanbackup.Single or chanbackup.Multi // backups. chanRestorer := &chanDBRestorer{ - db: r.server.chanDB, + db: r.server.remoteChanDB, secretKeys: r.server.cc.keyRing, chainArb: r.server.chainArb, } @@ -6162,7 +6165,7 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription // we'll obtains the current set of single channel // backups from disk. chanBackups, err := chanbackup.FetchStaticChanBackups( - r.server.chanDB, + r.server.remoteChanDB, ) if err != nil { return fmt.Errorf("unable to fetch all "+ diff --git a/server.go b/server.go index fe711cec..234d37a8 100644 --- a/server.go +++ b/server.go @@ -204,7 +204,9 @@ type server struct { fundingMgr *fundingManager - chanDB *channeldb.DB + localChanDB *channeldb.DB + + remoteChanDB *channeldb.DB htlcSwitch *htlcswitch.Switch @@ -328,7 +330,8 @@ func noiseDial(idKey keychain.SingleKeyECDH, // newServer creates a new instance of the server which is to listen using the // passed listener address. -func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, +func newServer(cfg *Config, listenAddrs []net.Addr, + localChanDB, remoteChanDB *channeldb.DB, towerClientDB *wtdb.ClientDB, cc *chainControl, nodeKeyDesc *keychain.KeyDescriptor, chansToRestore walletunlocker.ChannelsToRecover, @@ -360,8 +363,11 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, copy(serializedPubKey[:], nodeKeyECDH.PubKey().SerializeCompressed()) // Initialize the sphinx router, placing it's persistent replay log in - // the same directory as the channel graph database. - sharedSecretPath := filepath.Join(cfg.localDatabaseDir(), "sphinxreplay.db") + // the same directory as the channel graph database. We don't need to + // replicate this data, so we'll store it locally. + sharedSecretPath := filepath.Join( + cfg.localDatabaseDir(), defaultSphinxDbName, + ) replayLog := htlcswitch.NewDecayedLog(sharedSecretPath, cc.chainNotifier) sphinxRouter := sphinx.NewRouter( nodeKeyECDH, activeNetParams.Params, replayLog, @@ -405,7 +411,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, s := &server{ cfg: cfg, - chanDB: chanDB, + localChanDB: localChanDB, + remoteChanDB: remoteChanDB, cc: cc, sigPool: lnwallet.NewSigPool(cfg.Workers.Sig, cc.signer), writePool: writePool, @@ -413,11 +420,11 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, chansToRestore: chansToRestore, invoices: invoices.NewRegistry( - chanDB, invoices.NewInvoiceExpiryWatcher(clock.NewDefaultClock()), + remoteChanDB, invoices.NewInvoiceExpiryWatcher(clock.NewDefaultClock()), ®istryConfig, ), - channelNotifier: channelnotifier.New(chanDB), + channelNotifier: channelnotifier.New(remoteChanDB), identityECDH: nodeKeyECDH, nodeSigner: netann.NewNodeSigner(nodeKeySigner), @@ -449,7 +456,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, } s.witnessBeacon = &preimageBeacon{ - wCache: chanDB.NewWitnessCache(), + wCache: remoteChanDB.NewWitnessCache(), subscribers: make(map[uint64]*preimageSubscriber), } @@ -461,7 +468,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now) s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{ - DB: chanDB, + DB: remoteChanDB, LocalChannelClose: func(pubKey []byte, request *htlcswitch.ChanClose) { @@ -476,7 +483,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, peer.HandleLocalCloseChanReqs(request) }, - FwdingLog: chanDB.ForwardingLog(), + FwdingLog: remoteChanDB.ForwardingLog(), SwitchPackager: channeldb.NewSwitchPackager(), ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter, FetchLastChannelUpdate: s.fetchLastChanUpdate(), @@ -503,8 +510,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, MessageSigner: s.nodeSigner, IsChannelActive: s.htlcSwitch.HasActiveLink, ApplyChannelUpdate: s.applyChannelUpdate, - DB: chanDB, - Graph: chanDB.ChannelGraph(), + DB: remoteChanDB, + Graph: localChanDB.ChannelGraph(), } chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg) @@ -594,7 +601,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, selfAddrs := make([]net.Addr, 0, len(externalIPs)) selfAddrs = append(selfAddrs, externalIPs...) - chanGraph := chanDB.ChannelGraph() + // As the graph can be obtained at anytime from the network, we won't + // replicate it, and instead it'll only be stored locally. + chanGraph := localChanDB.ChannelGraph() // We'll now reconstruct a node announcement based on our current // configuration so we can send it out as a sort of heart beat within @@ -661,7 +670,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, // The router will get access to the payment ID sequencer, such that it // can generate unique payment IDs. - sequencer, err := htlcswitch.NewPersistentSequencer(chanDB) + sequencer, err := htlcswitch.NewPersistentSequencer(remoteChanDB) if err != nil { return nil, err } @@ -694,7 +703,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC) s.missionControl, err = routing.NewMissionControl( - chanDB, + remoteChanDB, &routing.MissionControlConfig{ AprioriHopProbability: routingConfig.AprioriHopProbability, PenaltyHalfLife: routingConfig.PenaltyHalfLife, @@ -727,7 +736,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, PathFindingConfig: pathFindingConfig, } - paymentControl := channeldb.NewPaymentControl(chanDB) + paymentControl := channeldb.NewPaymentControl(remoteChanDB) s.controlTower = routing.NewControlTower(paymentControl) @@ -751,12 +760,12 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, return nil, fmt.Errorf("can't create router: %v", err) } - chanSeries := discovery.NewChanSeries(s.chanDB.ChannelGraph()) - gossipMessageStore, err := discovery.NewMessageStore(s.chanDB) + chanSeries := discovery.NewChanSeries(s.localChanDB.ChannelGraph()) + gossipMessageStore, err := discovery.NewMessageStore(s.remoteChanDB) if err != nil { return nil, err } - waitingProofStore, err := channeldb.NewWaitingProofStore(s.chanDB) + waitingProofStore, err := channeldb.NewWaitingProofStore(s.remoteChanDB) if err != nil { return nil, err } @@ -793,10 +802,12 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, ForAllOutgoingChannels: s.chanRouter.ForAllOutgoingChannels, PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate, UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies, - FetchChannel: s.chanDB.FetchChannel, + FetchChannel: s.remoteChanDB.FetchChannel, } - utxnStore, err := newNurseryStore(activeNetParams.GenesisHash, chanDB) + utxnStore, err := newNurseryStore( + activeNetParams.GenesisHash, remoteChanDB, + ) if err != nil { srvrLog.Errorf("unable to create nursery store: %v", err) return nil, err @@ -806,7 +817,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, sweep.DefaultBatchWindowDuration) sweeperStore, err := sweep.NewSweeperStore( - chanDB, activeNetParams.GenesisHash, + remoteChanDB, activeNetParams.GenesisHash, ) if err != nil { srvrLog.Errorf("unable to create sweeper store: %v", err) @@ -833,8 +844,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, s.utxoNursery = newUtxoNursery(&NurseryConfig{ ChainIO: cc.chainIO, ConfDepth: 1, - FetchClosedChannels: chanDB.FetchClosedChannels, - FetchClosedChannel: chanDB.FetchClosedChannel, + FetchClosedChannels: remoteChanDB.FetchClosedChannels, + FetchClosedChannel: remoteChanDB.FetchClosedChannel, Notifier: cc.chainNotifier, PublishTransaction: cc.wallet.PublishTransaction, Store: utxnStore, @@ -935,18 +946,18 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod, IsForwardedHTLC: s.htlcSwitch.IsForwardedHTLC, Clock: clock.NewDefaultClock(), - }, chanDB) + }, remoteChanDB) s.breachArbiter = newBreachArbiter(&BreachConfig{ CloseLink: closeLink, - DB: chanDB, + DB: remoteChanDB, Estimator: s.cc.feeEstimator, GenSweepScript: newSweepPkScriptGen(cc.wallet), Notifier: cc.chainNotifier, PublishTransaction: cc.wallet.PublishTransaction, ContractBreaches: contractBreaches, Signer: cc.wallet.Cfg.Signer, - Store: newRetributionStore(chanDB), + Store: newRetributionStore(remoteChanDB), }) // Select the configuration and furnding parameters for Bitcoin or @@ -997,7 +1008,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, FindChannel: func(chanID lnwire.ChannelID) ( *channeldb.OpenChannel, error) { - dbChannels, err := chanDB.FetchAllChannels() + dbChannels, err := remoteChanDB.FetchAllChannels() if err != nil { return nil, err } @@ -1161,10 +1172,10 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, // static backup of the latest channel state. chanNotifier := &channelNotifier{ chanNotifier: s.channelNotifier, - addrs: s.chanDB, + addrs: s.remoteChanDB, } backupFile := chanbackup.NewMultiFile(cfg.BackupFilePath) - startingChans, err := chanbackup.FetchStaticChanBackups(s.chanDB) + startingChans, err := chanbackup.FetchStaticChanBackups(s.remoteChanDB) if err != nil { return nil, err } @@ -1183,7 +1194,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{ SubscribeChannelEvents: s.channelNotifier.SubscribeChannelEvents, SubscribePeerEvents: s.peerNotifier.SubscribePeerEvents, - GetOpenChannels: s.chanDB.FetchAllOpenChannels, + GetOpenChannels: s.remoteChanDB.FetchAllOpenChannels, }) if cfg.WtClient.Active { @@ -1384,7 +1395,7 @@ func (s *server) Start() error { // that have all the information we need to handle channel // recovery _before_ we even accept connections from any peers. chanRestorer := &chanDBRestorer{ - db: s.chanDB, + db: s.remoteChanDB, secretKeys: s.cc.keyRing, chainArb: s.chainArb, } @@ -1424,7 +1435,7 @@ func (s *server) Start() error { // we'll prune our set of link nodes found within the database // to ensure we don't reconnect to any nodes we no longer have // open channels with. - if err := s.chanDB.PruneLinkNodes(); err != nil { + if err := s.remoteChanDB.PruneLinkNodes(); err != nil { startErr = err return } @@ -1730,7 +1741,7 @@ func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, e // First, we'll create an instance of the ChannelGraphBootstrapper as // this can be used by default if we've already partially seeded the // network. - chanGraph := autopilot.ChannelGraphFromDatabase(s.chanDB.ChannelGraph()) + chanGraph := autopilot.ChannelGraphFromDatabase(s.localChanDB.ChannelGraph()) graphBootstrapper, err := discovery.NewGraphBootstrapper(chanGraph) if err != nil { return nil, err @@ -2062,7 +2073,7 @@ func (s *server) createNewHiddenService() error { AuthSigBytes: newNodeAnn.Signature.ToSignatureBytes(), } copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed()) - if err := s.chanDB.ChannelGraph().SetSourceNode(selfNode); err != nil { + if err := s.localChanDB.ChannelGraph().SetSourceNode(selfNode); err != nil { return fmt.Errorf("can't set self node: %v", err) } @@ -2119,7 +2130,7 @@ func (s *server) establishPersistentConnections() error { // Iterate through the list of LinkNodes to find addresses we should // attempt to connect to based on our set of previous connections. Set // the reconnection port to the default peer port. - linkNodes, err := s.chanDB.FetchAllLinkNodes() + linkNodes, err := s.remoteChanDB.FetchAllLinkNodes() if err != nil && err != channeldb.ErrLinkNodesNotFound { return err } @@ -2135,7 +2146,7 @@ func (s *server) establishPersistentConnections() error { // After checking our previous connections for addresses to connect to, // iterate through the nodes in our channel graph to find addresses // that have been added via NodeAnnouncement messages. - chanGraph := s.chanDB.ChannelGraph() + chanGraph := s.localChanDB.ChannelGraph() sourceNode, err := chanGraph.SourceNode() if err != nil { return err @@ -2842,7 +2853,8 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, ReadPool: s.readPool, Switch: s.htlcSwitch, InterceptSwitch: s.interceptableSwitch, - ChannelDB: s.chanDB, + ChannelDB: s.localChanDB, + ChannelGraph: s.remoteChanDB.ChannelGraph(), ChainArb: s.chainArb, AuthGossiper: s.authGossiper, ChanStatusMgr: s.chanStatusMgr, @@ -3543,7 +3555,7 @@ func (s *server) fetchNodeAdvertisedAddr(pub *btcec.PublicKey) (net.Addr, error) return nil, err } - node, err := s.chanDB.ChannelGraph().FetchLightningNode(nil, vertex) + node, err := s.localChanDB.ChannelGraph().FetchLightningNode(nil, vertex) if err != nil { return nil, err }