multi: split database storage into remote and local instances

In this commit, we split the database storage into two classes: remote
and local data. If etcd isn't active, then everything is actually just
local though we use two pointers everywhere. If etcd is active, then
everything but the graph goes into the remote database.
This commit is contained in:
Olaoluwa Osuntokun 2020-05-06 20:49:36 -07:00
parent 7355c8ba3a
commit f58b00ef55
No known key found for this signature in database
GPG Key ID: BC13F65E2DC84465
8 changed files with 240 additions and 123 deletions

View File

@ -161,7 +161,7 @@ type chainControl struct {
// full-node, another backed by a running bitcoind full-node, and the other
// backed by a running neutrino light client instance. When running with a
// neutrino light client instance, `neutrinoCS` must be non-nil.
func newChainControlFromConfig(cfg *Config, chanDB *channeldb.DB,
func newChainControlFromConfig(cfg *Config, localDB, remoteDB *channeldb.DB,
privateWalletPw, publicWalletPw []byte, birthday time.Time,
recoveryWindow uint32, wallet *wallet.Wallet,
neutrinoCS *neutrino.ChainService) (*chainControl, error) {
@ -225,8 +225,9 @@ func newChainControlFromConfig(cfg *Config, chanDB *channeldb.DB,
if cfg.HeightHintCacheQueryDisable {
ltndLog.Infof("Height Hint Cache Queries disabled")
}
// Initialize the height hint cache within the chain directory.
hintCache, err := chainntnfs.NewHeightHintCache(heightHintCacheConfig, chanDB)
hintCache, err := chainntnfs.NewHeightHintCache(heightHintCacheConfig, localDB)
if err != nil {
return nil, fmt.Errorf("unable to initialize height hint "+
"cache: %v", err)
@ -523,7 +524,7 @@ func newChainControlFromConfig(cfg *Config, chanDB *channeldb.DB,
// Create, and start the lnwallet, which handles the core payment
// channel logic, and exposes control via proxy state machines.
walletCfg := lnwallet.Config{
Database: chanDB,
Database: remoteDB,
Notifier: cc.chainNotifier,
WalletController: wc,
Signer: cc.signer,

View File

@ -126,6 +126,8 @@ var (
// estimatesmartfee RPC call.
defaultBitcoindEstimateMode = "CONSERVATIVE"
bitcoindEstimateModes = [2]string{"ECONOMICAL", defaultBitcoindEstimateMode}
defaultSphinxDbName = "sphinxreplay.db"
)
// Config defines the configuration options for lnd.

170
lnd.go
View File

@ -246,50 +246,20 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
defer pprof.StopCPUProfile()
}
// Create the network-segmented directory for the channel database.
ltndLog.Infof("Opening the main database, this might take a few " +
"minutes...")
startOpenTime := time.Now()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if cfg.DB.Backend == lncfg.BoltBackend {
ltndLog.Infof("Opening bbolt database, sync_freelist=%v",
cfg.DB.Bolt.SyncFreelist)
}
databaseBackends, err := cfg.DB.GetBackends(
ctx, cfg.localDatabaseDir(), cfg.networkName(),
)
if err != nil {
ltndLog.Error(err)
return err
}
// Open the channeldb, which is dedicated to storing channel, and
// network related metadata.
chanDB, err := channeldb.CreateWithBackend(
databaseBackends.LocalDB,
channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize),
channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize),
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
)
localChanDB, remoteChanDB, cleanUp, err := initializeDatabases(ctx, cfg)
switch {
case err == channeldb.ErrDryRunMigrationOK:
ltndLog.Infof("%v, exiting", err)
return nil
case err != nil:
ltndLog.Errorf("Unable to open channeldb: %v", err)
return err
return fmt.Errorf("unable to open databases: %v", err)
}
defer chanDB.Close()
openTime := time.Since(startOpenTime)
ltndLog.Infof("Database now open (time_to_open=%v)!", openTime)
defer cleanUp()
// Only process macaroons if --no-macaroons isn't set.
tlsCfg, restCreds, restProxyDest, err := getTLSConfig(cfg)
@ -461,8 +431,12 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
// With the information parsed from the configuration, create valid
// instances of the pertinent interfaces required to operate the
// Lightning Network Daemon.
//
// When we create the chain control, we need storage for the height
// hints and also the wallet itself, for these two we want them to be
// replicated, so we'll pass in the remote channel DB instance.
activeChainControl, err := newChainControlFromConfig(
cfg, chanDB, privateWalletPw, publicWalletPw,
cfg, localChanDB, remoteChanDB, privateWalletPw, publicWalletPw,
walletInitParams.Birthday, walletInitParams.RecoveryWindow,
walletInitParams.Wallet, neutrinoCS,
)
@ -617,9 +591,9 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
// Set up the core server which will listen for incoming peer
// connections.
server, err := newServer(
cfg, cfg.Listeners, chanDB, towerClientDB, activeChainControl,
&idKeyDesc, walletInitParams.ChansToRestore, chainedAcceptor,
torController,
cfg, cfg.Listeners, localChanDB, remoteChanDB, towerClientDB,
activeChainControl, &idKeyDesc, walletInitParams.ChansToRestore,
chainedAcceptor, torController,
)
if err != nil {
err := fmt.Errorf("unable to create server: %v", err)
@ -1140,3 +1114,125 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr,
return nil, fmt.Errorf("shutting down")
}
}
// initializeDatabases extracts the current databases that we'll use for normal
// operation in the daemon. Two databases are returned: one remote and one
// local. However, only if the replicated database is active will the remote
// database point to a unique database. Otherwise, the local and remote DB will
// both point to the same local database. A function closure that closes all
// opened databases is also returned.
func initializeDatabases(ctx context.Context,
cfg *Config) (*channeldb.DB, *channeldb.DB, func(), error) {
ltndLog.Infof("Opening the main database, this might take a few " +
"minutes...")
if cfg.DB.Backend == lncfg.BoltBackend {
ltndLog.Infof("Opening bbolt database, sync_freelist=%v",
cfg.DB.Bolt.SyncFreelist)
}
startOpenTime := time.Now()
databaseBackends, err := cfg.DB.GetBackends(
ctx, cfg.localDatabaseDir(), cfg.networkName(),
)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to obtain database "+
"backends: %v", err)
}
// If the remoteDB is nil, then we'll just open a local DB as normal,
// having the remote and local pointer be the exact same instance.
var (
localChanDB, remoteChanDB *channeldb.DB
closeFuncs []func()
)
if databaseBackends.RemoteDB == nil {
// Open the channeldb, which is dedicated to storing channel,
// and network related metadata.
localChanDB, err = channeldb.CreateWithBackend(
databaseBackends.LocalDB,
channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize),
channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize),
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
)
switch {
case err == channeldb.ErrDryRunMigrationOK:
return nil, nil, nil, err
case err != nil:
err := fmt.Errorf("unable to open local channeldb: %v", err)
ltndLog.Error(err)
return nil, nil, nil, err
}
closeFuncs = append(closeFuncs, func() {
localChanDB.Close()
})
remoteChanDB = localChanDB
} else {
ltndLog.Infof("Database replication is available! Creating " +
"local and remote channeldb instances")
// Otherwise, we'll open two instances, one for the state we
// only need locally, and the other for things we want to
// ensure are replicated.
localChanDB, err = channeldb.CreateWithBackend(
databaseBackends.LocalDB,
channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize),
channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize),
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
)
switch {
// As we want to allow both versions to get thru the dry run
// migration, we'll only exit the second time here once the
// remote instance has had a time to migrate as well.
case err == channeldb.ErrDryRunMigrationOK:
ltndLog.Infof("Local DB dry run migration successful")
case err != nil:
err := fmt.Errorf("unable to open local channeldb: %v", err)
ltndLog.Error(err)
return nil, nil, nil, err
}
closeFuncs = append(closeFuncs, func() {
localChanDB.Close()
})
ltndLog.Infof("Opening replicated database instance...")
remoteChanDB, err = channeldb.CreateWithBackend(
databaseBackends.RemoteDB,
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
)
switch {
case err == channeldb.ErrDryRunMigrationOK:
return nil, nil, nil, err
case err != nil:
localChanDB.Close()
err := fmt.Errorf("unable to open remote channeldb: %v", err)
ltndLog.Error(err)
return nil, nil, nil, err
}
closeFuncs = append(closeFuncs, func() {
remoteChanDB.Close()
})
}
openTime := time.Since(startOpenTime)
ltndLog.Infof("Database now open (time_to_open=%v)!", openTime)
cleanUp := func() {
for _, closeFunc := range closeFuncs {
closeFunc()
}
}
return localChanDB, remoteChanDB, cleanUp, nil
}

View File

@ -456,7 +456,7 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
// Before we register this new link with the HTLC Switch, we'll
// need to fetch its current link-layer forwarding policy from
// the database.
graph := p.cfg.ChannelDB.ChannelGraph()
graph := p.cfg.ChannelGraph
info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint)
if err != nil && err != channeldb.ErrEdgeNotFound {
return nil, err

View File

@ -90,10 +90,13 @@ type Config struct {
// ChannelLinkConfig.
InterceptSwitch *htlcswitch.InterceptableSwitch
// ChannelDB is used to fetch opened channels, closed channels, and the
// channel graph.
// ChannelDB is used to fetch opened channels, and closed channels.
ChannelDB *channeldb.DB
// ChannelGraph is a pointer to the channel graph which is used to
// query information about the set of known active channels.
ChannelGraph *channeldb.ChannelGraph
// ChainArb is used to subscribe to channel events, update contract signals,
// and force close channels.
ChainArb *contractcourt.ChainArbitrator

View File

@ -185,7 +185,7 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot,
WalletBalance: func() (btcutil.Amount, error) {
return svr.cc.wallet.ConfirmedBalance(cfg.MinConfs)
},
Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()),
Graph: autopilot.ChannelGraphFromDatabase(svr.localChanDB.ChannelGraph()),
Constraints: atplConstraints,
ConnectToPeer: func(target *btcec.PublicKey, addrs []net.Addr) (bool, error) {
// First, we'll check if we're already connected to the
@ -256,7 +256,7 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot,
// We'll fetch the current state of open
// channels from the database to use as initial
// state for the auto-pilot agent.
activeChannels, err := svr.chanDB.FetchAllChannels()
activeChannels, err := svr.remoteChanDB.FetchAllChannels()
if err != nil {
return nil, err
}

View File

@ -522,12 +522,12 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service,
chanPredicate *chanacceptor.ChainedAcceptor) (*rpcServer, error) {
// Set up router rpc backend.
channelGraph := s.chanDB.ChannelGraph()
channelGraph := s.localChanDB.ChannelGraph()
selfNode, err := channelGraph.SourceNode()
if err != nil {
return nil, err
}
graph := s.chanDB.ChannelGraph()
graph := s.localChanDB.ChannelGraph()
routerBackend := &routerrpc.RouterBackend{
SelfNode: selfNode.PubKeyBytes,
FetchChannelCapacity: func(chanID uint64) (btcutil.Amount,
@ -576,10 +576,12 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service,
// Before we create any of the sub-servers, we need to ensure that all
// the dependencies they need are properly populated within each sub
// server configuration struct.
//
// TODO(roasbeef): extend sub-sever config to have both (local vs remote) DB
err = subServerCgs.PopulateDependencies(
cfg, s.cc, cfg.networkDir, macService, atpl, invoiceRegistry,
s.htlcSwitch, activeNetParams.Params, s.chanRouter,
routerBackend, s.nodeSigner, s.chanDB, s.sweeper, tower,
routerBackend, s.nodeSigner, s.remoteChanDB, s.sweeper, tower,
s.towerClient, cfg.net.ResolveTCPAddr, genInvoiceFeatures,
rpcsLog,
)
@ -1399,7 +1401,7 @@ func (r *rpcServer) VerifyMessage(ctx context.Context,
// channels signed the message.
//
// TODO(phlip9): Require valid nodes to have capital in active channels.
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
_, active, err := graph.HasLightningNode(pub)
if err != nil {
return nil, fmt.Errorf("failed to query graph: %v", err)
@ -1488,7 +1490,7 @@ func (r *rpcServer) DisconnectPeer(ctx context.Context,
// Next, we'll fetch the pending/active channels we have with a
// particular peer.
nodeChannels, err := r.server.chanDB.FetchOpenChannels(peerPubKey)
nodeChannels, err := r.server.remoteChanDB.FetchOpenChannels(peerPubKey)
if err != nil {
return nil, fmt.Errorf("unable to fetch channels for peer: %v", err)
}
@ -2065,7 +2067,7 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
// First, we'll fetch the channel as is, as we'll need to examine it
// regardless of if this is a force close or not.
channel, err := r.server.chanDB.FetchChannel(*chanPoint)
channel, err := r.server.remoteChanDB.FetchChannel(*chanPoint)
if err != nil {
return err
}
@ -2329,7 +2331,7 @@ func (r *rpcServer) AbandonChannel(ctx context.Context,
return nil, err
}
dbChan, err := r.server.chanDB.FetchChannel(*chanPoint)
dbChan, err := r.server.remoteChanDB.FetchChannel(*chanPoint)
switch {
// If the channel isn't found in the set of open channels, then we can
// continue on as it can't be loaded into the link/peer.
@ -2360,12 +2362,12 @@ func (r *rpcServer) AbandonChannel(ctx context.Context,
// court. Between any step it's possible that the users restarts the
// process all over again. As a result, each of the steps below are
// intended to be idempotent.
err = r.server.chanDB.AbandonChannel(chanPoint, uint32(bestHeight))
err = r.server.remoteChanDB.AbandonChannel(chanPoint, uint32(bestHeight))
if err != nil {
return nil, err
}
err = abandonChanFromGraph(
r.server.chanDB.ChannelGraph(), chanPoint,
r.server.localChanDB.ChannelGraph(), chanPoint,
)
if err != nil {
return nil, err
@ -2399,7 +2401,7 @@ func (r *rpcServer) GetInfo(ctx context.Context,
serverPeers := r.server.Peers()
openChannels, err := r.server.chanDB.FetchAllOpenChannels()
openChannels, err := r.server.remoteChanDB.FetchAllOpenChannels()
if err != nil {
return nil, err
}
@ -2414,7 +2416,7 @@ func (r *rpcServer) GetInfo(ctx context.Context,
inactiveChannels := uint32(len(openChannels)) - activeChannels
pendingChannels, err := r.server.chanDB.FetchPendingChannels()
pendingChannels, err := r.server.remoteChanDB.FetchPendingChannels()
if err != nil {
return nil, fmt.Errorf("unable to get retrieve pending "+
"channels: %v", err)
@ -2713,7 +2715,7 @@ func (r *rpcServer) WalletBalance(ctx context.Context,
func (r *rpcServer) ChannelBalance(ctx context.Context,
in *lnrpc.ChannelBalanceRequest) (*lnrpc.ChannelBalanceResponse, error) {
openChannels, err := r.server.chanDB.FetchAllOpenChannels()
openChannels, err := r.server.remoteChanDB.FetchAllOpenChannels()
if err != nil {
return nil, err
}
@ -2723,7 +2725,7 @@ func (r *rpcServer) ChannelBalance(ctx context.Context,
balance += channel.LocalCommitment.LocalBalance.ToSatoshis()
}
pendingChannels, err := r.server.chanDB.FetchPendingChannels()
pendingChannels, err := r.server.remoteChanDB.FetchPendingChannels()
if err != nil {
return nil, err
}
@ -2766,7 +2768,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
// First, we'll populate the response with all the channels that are
// soon to be opened. We can easily fetch this data from the database
// and map the db struct to the proto response.
pendingOpenChannels, err := r.server.chanDB.FetchPendingChannels()
pendingOpenChannels, err := r.server.remoteChanDB.FetchPendingChannels()
if err != nil {
rpcsLog.Errorf("unable to fetch pending channels: %v", err)
return nil, err
@ -2814,7 +2816,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
// Next, we'll examine the channels that are soon to be closed so we
// can populate these fields within the response.
pendingCloseChannels, err := r.server.chanDB.FetchClosedChannels(true)
pendingCloseChannels, err := r.server.remoteChanDB.FetchClosedChannels(true)
if err != nil {
rpcsLog.Errorf("unable to fetch closed channels: %v", err)
return nil, err
@ -2843,7 +2845,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
// not found, or the channel itself, this channel was closed
// in a version before we started persisting historical
// channels, so we silence the error.
historical, err := r.server.chanDB.FetchHistoricalChannel(
historical, err := r.server.remoteChanDB.FetchHistoricalChannel(
&pendingClose.ChanPoint,
)
switch err {
@ -2918,7 +2920,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
// We'll also fetch all channels that are open, but have had their
// commitment broadcasted, meaning they are waiting for the closing
// transaction to confirm.
waitingCloseChans, err := r.server.chanDB.FetchWaitingCloseChannels()
waitingCloseChans, err := r.server.remoteChanDB.FetchWaitingCloseChannels()
if err != nil {
rpcsLog.Errorf("unable to fetch channels waiting close: %v",
err)
@ -3153,7 +3155,7 @@ func (r *rpcServer) ClosedChannels(ctx context.Context,
resp := &lnrpc.ClosedChannelsResponse{}
dbChannels, err := r.server.chanDB.FetchClosedChannels(false)
dbChannels, err := r.server.remoteChanDB.FetchClosedChannels(false)
if err != nil {
return nil, err
}
@ -3230,9 +3232,9 @@ func (r *rpcServer) ListChannels(ctx context.Context,
resp := &lnrpc.ListChannelsResponse{}
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
dbChannels, err := r.server.chanDB.FetchAllOpenChannels()
dbChannels, err := r.server.remoteChanDB.FetchAllOpenChannels()
if err != nil {
return nil, err
}
@ -3549,7 +3551,8 @@ func (r *rpcServer) createRPCClosedChannel(
CloseInitiator: closeInitiator,
}
reports, err := r.server.chanDB.FetchChannelReports(
reports, err := r.server.remoteChanDB.FetchChannelReports(
*activeNetParams.GenesisHash, &dbChannel.ChanPoint,
)
switch err {
@ -3654,7 +3657,7 @@ func (r *rpcServer) getInitiators(chanPoint *wire.OutPoint) (
// To get the close initiator for cooperative closes, we need
// to get the channel status from the historical channel bucket.
histChan, err := r.server.chanDB.FetchHistoricalChannel(chanPoint)
histChan, err := r.server.remoteChanDB.FetchHistoricalChannel(chanPoint)
switch {
// The node has upgraded from a version where we did not store
// historical channels, and has not closed a channel since. Do
@ -3718,7 +3721,7 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription,
// the server, or client exits.
defer channelEventSub.Cancel()
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
for {
select {
@ -4526,7 +4529,7 @@ func (r *rpcServer) AddInvoice(ctx context.Context,
ChainParams: activeNetParams.Params,
NodeSigner: r.server.nodeSigner,
DefaultCLTVExpiry: defaultDelta,
ChanDB: r.server.chanDB,
ChanDB: r.server.remoteChanDB,
GenInvoiceFeatures: func() *lnwire.FeatureVector {
return r.server.featureMgr.Get(feature.SetInvoice)
},
@ -4640,7 +4643,7 @@ func (r *rpcServer) ListInvoices(ctx context.Context,
PendingOnly: req.PendingOnly,
Reversed: req.Reversed,
}
invoiceSlice, err := r.server.chanDB.QueryInvoices(q)
invoiceSlice, err := r.server.remoteChanDB.QueryInvoices(q)
if err != nil {
return nil, fmt.Errorf("unable to query invoices: %v", err)
}
@ -4806,7 +4809,7 @@ func (r *rpcServer) DescribeGraph(ctx context.Context,
// Obtain the pointer to the global singleton channel graph, this will
// provide a consistent view of the graph due to bolt db's
// transactional model.
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
// First iterate through all the known nodes (connected or unconnected
// within the graph), collating their current state into the RPC
@ -4944,7 +4947,7 @@ func (r *rpcServer) GetNodeMetrics(ctx context.Context,
// Obtain the pointer to the global singleton channel graph, this will
// provide a consistent view of the graph due to bolt db's
// transactional model.
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
// Calculate betweenness centrality if requested. Note that depending on the
// graph size, this may take up to a few minutes.
@ -4983,7 +4986,7 @@ func (r *rpcServer) GetNodeMetrics(ctx context.Context,
func (r *rpcServer) GetChanInfo(ctx context.Context,
in *lnrpc.ChanInfoRequest) (*lnrpc.ChannelEdge, error) {
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
edgeInfo, edge1, edge2, err := graph.FetchChannelEdgesByID(in.ChanId)
if err != nil {
@ -5003,7 +5006,7 @@ func (r *rpcServer) GetChanInfo(ctx context.Context,
func (r *rpcServer) GetNodeInfo(ctx context.Context,
in *lnrpc.NodeInfoRequest) (*lnrpc.NodeInfo, error) {
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
// First, parse the hex-encoded public key into a full in-memory public
// key object we can work with for querying.
@ -5101,7 +5104,7 @@ func (r *rpcServer) QueryRoutes(ctx context.Context,
func (r *rpcServer) GetNetworkInfo(ctx context.Context,
_ *lnrpc.NetworkInfoRequest) (*lnrpc.NetworkInfo, error) {
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
var (
numNodes uint32
@ -5381,7 +5384,7 @@ func (r *rpcServer) ListPayments(ctx context.Context,
query.MaxPayments = math.MaxUint64
}
paymentsQuerySlice, err := r.server.chanDB.QueryPayments(query)
paymentsQuerySlice, err := r.server.remoteChanDB.QueryPayments(query)
if err != nil {
return nil, err
}
@ -5413,7 +5416,7 @@ func (r *rpcServer) DeleteAllPayments(ctx context.Context,
rpcsLog.Debugf("[DeleteAllPayments]")
if err := r.server.chanDB.DeletePayments(); err != nil {
if err := r.server.remoteChanDB.DeletePayments(); err != nil {
return nil, err
}
@ -5533,7 +5536,7 @@ func (r *rpcServer) FeeReport(ctx context.Context,
rpcsLog.Debugf("[feereport]")
channelGraph := r.server.chanDB.ChannelGraph()
channelGraph := r.server.localChanDB.ChannelGraph()
selfNode, err := channelGraph.SourceNode()
if err != nil {
return nil, err
@ -5572,7 +5575,7 @@ func (r *rpcServer) FeeReport(ctx context.Context,
return nil, err
}
fwdEventLog := r.server.chanDB.ForwardingLog()
fwdEventLog := r.server.remoteChanDB.ForwardingLog()
// computeFeeSum is a helper function that computes the total fees for
// a particular time slice described by a forwarding event query.
@ -5810,7 +5813,7 @@ func (r *rpcServer) ForwardingHistory(ctx context.Context,
IndexOffset: req.IndexOffset,
NumMaxEvents: numEvents,
}
timeSlice, err := r.server.chanDB.ForwardingLog().Query(eventQuery)
timeSlice, err := r.server.remoteChanDB.ForwardingLog().Query(eventQuery)
if err != nil {
return nil, fmt.Errorf("unable to query forwarding log: %v", err)
}
@ -5871,7 +5874,7 @@ func (r *rpcServer) ExportChannelBackup(ctx context.Context,
// the database. If this channel has been closed, or the outpoint is
// unknown, then we'll return an error
unpackedBackup, err := chanbackup.FetchBackupForChan(
chanPoint, r.server.chanDB,
chanPoint, r.server.remoteChanDB,
)
if err != nil {
return nil, err
@ -6041,7 +6044,7 @@ func (r *rpcServer) ExportAllChannelBackups(ctx context.Context,
// First, we'll attempt to read back ups for ALL currently opened
// channels from disk.
allUnpackedBackups, err := chanbackup.FetchStaticChanBackups(
r.server.chanDB,
r.server.remoteChanDB,
)
if err != nil {
return nil, fmt.Errorf("unable to fetch all static chan "+
@ -6064,7 +6067,7 @@ func (r *rpcServer) RestoreChannelBackups(ctx context.Context,
// restore either a set of chanbackup.Single or chanbackup.Multi
// backups.
chanRestorer := &chanDBRestorer{
db: r.server.chanDB,
db: r.server.remoteChanDB,
secretKeys: r.server.cc.keyRing,
chainArb: r.server.chainArb,
}
@ -6162,7 +6165,7 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription
// we'll obtains the current set of single channel
// backups from disk.
chanBackups, err := chanbackup.FetchStaticChanBackups(
r.server.chanDB,
r.server.remoteChanDB,
)
if err != nil {
return fmt.Errorf("unable to fetch all "+

View File

@ -204,7 +204,9 @@ type server struct {
fundingMgr *fundingManager
chanDB *channeldb.DB
localChanDB *channeldb.DB
remoteChanDB *channeldb.DB
htlcSwitch *htlcswitch.Switch
@ -328,7 +330,8 @@ func noiseDial(idKey keychain.SingleKeyECDH,
// newServer creates a new instance of the server which is to listen using the
// passed listener address.
func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
func newServer(cfg *Config, listenAddrs []net.Addr,
localChanDB, remoteChanDB *channeldb.DB,
towerClientDB *wtdb.ClientDB, cc *chainControl,
nodeKeyDesc *keychain.KeyDescriptor,
chansToRestore walletunlocker.ChannelsToRecover,
@ -360,8 +363,11 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
copy(serializedPubKey[:], nodeKeyECDH.PubKey().SerializeCompressed())
// Initialize the sphinx router, placing it's persistent replay log in
// the same directory as the channel graph database.
sharedSecretPath := filepath.Join(cfg.localDatabaseDir(), "sphinxreplay.db")
// the same directory as the channel graph database. We don't need to
// replicate this data, so we'll store it locally.
sharedSecretPath := filepath.Join(
cfg.localDatabaseDir(), defaultSphinxDbName,
)
replayLog := htlcswitch.NewDecayedLog(sharedSecretPath, cc.chainNotifier)
sphinxRouter := sphinx.NewRouter(
nodeKeyECDH, activeNetParams.Params, replayLog,
@ -405,7 +411,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
s := &server{
cfg: cfg,
chanDB: chanDB,
localChanDB: localChanDB,
remoteChanDB: remoteChanDB,
cc: cc,
sigPool: lnwallet.NewSigPool(cfg.Workers.Sig, cc.signer),
writePool: writePool,
@ -413,11 +420,11 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
chansToRestore: chansToRestore,
invoices: invoices.NewRegistry(
chanDB, invoices.NewInvoiceExpiryWatcher(clock.NewDefaultClock()),
remoteChanDB, invoices.NewInvoiceExpiryWatcher(clock.NewDefaultClock()),
&registryConfig,
),
channelNotifier: channelnotifier.New(chanDB),
channelNotifier: channelnotifier.New(remoteChanDB),
identityECDH: nodeKeyECDH,
nodeSigner: netann.NewNodeSigner(nodeKeySigner),
@ -449,7 +456,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
}
s.witnessBeacon = &preimageBeacon{
wCache: chanDB.NewWitnessCache(),
wCache: remoteChanDB.NewWitnessCache(),
subscribers: make(map[uint64]*preimageSubscriber),
}
@ -461,7 +468,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now)
s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{
DB: chanDB,
DB: remoteChanDB,
LocalChannelClose: func(pubKey []byte,
request *htlcswitch.ChanClose) {
@ -476,7 +483,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
peer.HandleLocalCloseChanReqs(request)
},
FwdingLog: chanDB.ForwardingLog(),
FwdingLog: remoteChanDB.ForwardingLog(),
SwitchPackager: channeldb.NewSwitchPackager(),
ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter,
FetchLastChannelUpdate: s.fetchLastChanUpdate(),
@ -503,8 +510,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
MessageSigner: s.nodeSigner,
IsChannelActive: s.htlcSwitch.HasActiveLink,
ApplyChannelUpdate: s.applyChannelUpdate,
DB: chanDB,
Graph: chanDB.ChannelGraph(),
DB: remoteChanDB,
Graph: localChanDB.ChannelGraph(),
}
chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg)
@ -594,7 +601,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
selfAddrs := make([]net.Addr, 0, len(externalIPs))
selfAddrs = append(selfAddrs, externalIPs...)
chanGraph := chanDB.ChannelGraph()
// As the graph can be obtained at anytime from the network, we won't
// replicate it, and instead it'll only be stored locally.
chanGraph := localChanDB.ChannelGraph()
// We'll now reconstruct a node announcement based on our current
// configuration so we can send it out as a sort of heart beat within
@ -661,7 +670,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
// The router will get access to the payment ID sequencer, such that it
// can generate unique payment IDs.
sequencer, err := htlcswitch.NewPersistentSequencer(chanDB)
sequencer, err := htlcswitch.NewPersistentSequencer(remoteChanDB)
if err != nil {
return nil, err
}
@ -694,7 +703,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC)
s.missionControl, err = routing.NewMissionControl(
chanDB,
remoteChanDB,
&routing.MissionControlConfig{
AprioriHopProbability: routingConfig.AprioriHopProbability,
PenaltyHalfLife: routingConfig.PenaltyHalfLife,
@ -727,7 +736,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
PathFindingConfig: pathFindingConfig,
}
paymentControl := channeldb.NewPaymentControl(chanDB)
paymentControl := channeldb.NewPaymentControl(remoteChanDB)
s.controlTower = routing.NewControlTower(paymentControl)
@ -751,12 +760,12 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
return nil, fmt.Errorf("can't create router: %v", err)
}
chanSeries := discovery.NewChanSeries(s.chanDB.ChannelGraph())
gossipMessageStore, err := discovery.NewMessageStore(s.chanDB)
chanSeries := discovery.NewChanSeries(s.localChanDB.ChannelGraph())
gossipMessageStore, err := discovery.NewMessageStore(s.remoteChanDB)
if err != nil {
return nil, err
}
waitingProofStore, err := channeldb.NewWaitingProofStore(s.chanDB)
waitingProofStore, err := channeldb.NewWaitingProofStore(s.remoteChanDB)
if err != nil {
return nil, err
}
@ -793,10 +802,12 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
ForAllOutgoingChannels: s.chanRouter.ForAllOutgoingChannels,
PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate,
UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies,
FetchChannel: s.chanDB.FetchChannel,
FetchChannel: s.remoteChanDB.FetchChannel,
}
utxnStore, err := newNurseryStore(activeNetParams.GenesisHash, chanDB)
utxnStore, err := newNurseryStore(
activeNetParams.GenesisHash, remoteChanDB,
)
if err != nil {
srvrLog.Errorf("unable to create nursery store: %v", err)
return nil, err
@ -806,7 +817,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
sweep.DefaultBatchWindowDuration)
sweeperStore, err := sweep.NewSweeperStore(
chanDB, activeNetParams.GenesisHash,
remoteChanDB, activeNetParams.GenesisHash,
)
if err != nil {
srvrLog.Errorf("unable to create sweeper store: %v", err)
@ -833,8 +844,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
s.utxoNursery = newUtxoNursery(&NurseryConfig{
ChainIO: cc.chainIO,
ConfDepth: 1,
FetchClosedChannels: chanDB.FetchClosedChannels,
FetchClosedChannel: chanDB.FetchClosedChannel,
FetchClosedChannels: remoteChanDB.FetchClosedChannels,
FetchClosedChannel: remoteChanDB.FetchClosedChannel,
Notifier: cc.chainNotifier,
PublishTransaction: cc.wallet.PublishTransaction,
Store: utxnStore,
@ -935,18 +946,18 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod,
IsForwardedHTLC: s.htlcSwitch.IsForwardedHTLC,
Clock: clock.NewDefaultClock(),
}, chanDB)
}, remoteChanDB)
s.breachArbiter = newBreachArbiter(&BreachConfig{
CloseLink: closeLink,
DB: chanDB,
DB: remoteChanDB,
Estimator: s.cc.feeEstimator,
GenSweepScript: newSweepPkScriptGen(cc.wallet),
Notifier: cc.chainNotifier,
PublishTransaction: cc.wallet.PublishTransaction,
ContractBreaches: contractBreaches,
Signer: cc.wallet.Cfg.Signer,
Store: newRetributionStore(chanDB),
Store: newRetributionStore(remoteChanDB),
})
// Select the configuration and furnding parameters for Bitcoin or
@ -997,7 +1008,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
FindChannel: func(chanID lnwire.ChannelID) (
*channeldb.OpenChannel, error) {
dbChannels, err := chanDB.FetchAllChannels()
dbChannels, err := remoteChanDB.FetchAllChannels()
if err != nil {
return nil, err
}
@ -1161,10 +1172,10 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
// static backup of the latest channel state.
chanNotifier := &channelNotifier{
chanNotifier: s.channelNotifier,
addrs: s.chanDB,
addrs: s.remoteChanDB,
}
backupFile := chanbackup.NewMultiFile(cfg.BackupFilePath)
startingChans, err := chanbackup.FetchStaticChanBackups(s.chanDB)
startingChans, err := chanbackup.FetchStaticChanBackups(s.remoteChanDB)
if err != nil {
return nil, err
}
@ -1183,7 +1194,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{
SubscribeChannelEvents: s.channelNotifier.SubscribeChannelEvents,
SubscribePeerEvents: s.peerNotifier.SubscribePeerEvents,
GetOpenChannels: s.chanDB.FetchAllOpenChannels,
GetOpenChannels: s.remoteChanDB.FetchAllOpenChannels,
})
if cfg.WtClient.Active {
@ -1384,7 +1395,7 @@ func (s *server) Start() error {
// that have all the information we need to handle channel
// recovery _before_ we even accept connections from any peers.
chanRestorer := &chanDBRestorer{
db: s.chanDB,
db: s.remoteChanDB,
secretKeys: s.cc.keyRing,
chainArb: s.chainArb,
}
@ -1424,7 +1435,7 @@ func (s *server) Start() error {
// we'll prune our set of link nodes found within the database
// to ensure we don't reconnect to any nodes we no longer have
// open channels with.
if err := s.chanDB.PruneLinkNodes(); err != nil {
if err := s.remoteChanDB.PruneLinkNodes(); err != nil {
startErr = err
return
}
@ -1730,7 +1741,7 @@ func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, e
// First, we'll create an instance of the ChannelGraphBootstrapper as
// this can be used by default if we've already partially seeded the
// network.
chanGraph := autopilot.ChannelGraphFromDatabase(s.chanDB.ChannelGraph())
chanGraph := autopilot.ChannelGraphFromDatabase(s.localChanDB.ChannelGraph())
graphBootstrapper, err := discovery.NewGraphBootstrapper(chanGraph)
if err != nil {
return nil, err
@ -2062,7 +2073,7 @@ func (s *server) createNewHiddenService() error {
AuthSigBytes: newNodeAnn.Signature.ToSignatureBytes(),
}
copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed())
if err := s.chanDB.ChannelGraph().SetSourceNode(selfNode); err != nil {
if err := s.localChanDB.ChannelGraph().SetSourceNode(selfNode); err != nil {
return fmt.Errorf("can't set self node: %v", err)
}
@ -2119,7 +2130,7 @@ func (s *server) establishPersistentConnections() error {
// Iterate through the list of LinkNodes to find addresses we should
// attempt to connect to based on our set of previous connections. Set
// the reconnection port to the default peer port.
linkNodes, err := s.chanDB.FetchAllLinkNodes()
linkNodes, err := s.remoteChanDB.FetchAllLinkNodes()
if err != nil && err != channeldb.ErrLinkNodesNotFound {
return err
}
@ -2135,7 +2146,7 @@ func (s *server) establishPersistentConnections() error {
// After checking our previous connections for addresses to connect to,
// iterate through the nodes in our channel graph to find addresses
// that have been added via NodeAnnouncement messages.
chanGraph := s.chanDB.ChannelGraph()
chanGraph := s.localChanDB.ChannelGraph()
sourceNode, err := chanGraph.SourceNode()
if err != nil {
return err
@ -2842,7 +2853,8 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
ReadPool: s.readPool,
Switch: s.htlcSwitch,
InterceptSwitch: s.interceptableSwitch,
ChannelDB: s.chanDB,
ChannelDB: s.localChanDB,
ChannelGraph: s.remoteChanDB.ChannelGraph(),
ChainArb: s.chainArb,
AuthGossiper: s.authGossiper,
ChanStatusMgr: s.chanStatusMgr,
@ -3543,7 +3555,7 @@ func (s *server) fetchNodeAdvertisedAddr(pub *btcec.PublicKey) (net.Addr, error)
return nil, err
}
node, err := s.chanDB.ChannelGraph().FetchLightningNode(nil, vertex)
node, err := s.localChanDB.ChannelGraph().FetchLightningNode(nil, vertex)
if err != nil {
return nil, err
}