Merge pull request #4348 from Roasbeef/etcd-local-graph-db

multi: create split local/remote database structure
This commit is contained in:
Olaoluwa Osuntokun 2020-08-07 19:50:02 -07:00 committed by GitHub
commit 38265b9a46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 279 additions and 129 deletions

@ -161,7 +161,7 @@ type chainControl struct {
// full-node, another backed by a running bitcoind full-node, and the other
// backed by a running neutrino light client instance. When running with a
// neutrino light client instance, `neutrinoCS` must be non-nil.
func newChainControlFromConfig(cfg *Config, chanDB *channeldb.DB,
func newChainControlFromConfig(cfg *Config, localDB, remoteDB *channeldb.DB,
privateWalletPw, publicWalletPw []byte, birthday time.Time,
recoveryWindow uint32, wallet *wallet.Wallet,
neutrinoCS *neutrino.ChainService) (*chainControl, error) {
@ -225,8 +225,9 @@ func newChainControlFromConfig(cfg *Config, chanDB *channeldb.DB,
if cfg.HeightHintCacheQueryDisable {
ltndLog.Infof("Height Hint Cache Queries disabled")
}
// Initialize the height hint cache within the chain directory.
hintCache, err := chainntnfs.NewHeightHintCache(heightHintCacheConfig, chanDB)
hintCache, err := chainntnfs.NewHeightHintCache(heightHintCacheConfig, localDB)
if err != nil {
return nil, fmt.Errorf("unable to initialize height hint "+
"cache: %v", err)
@ -523,7 +524,7 @@ func newChainControlFromConfig(cfg *Config, chanDB *channeldb.DB,
// Create, and start the lnwallet, which handles the core payment
// channel logic, and exposes control via proxy state machines.
walletCfg := lnwallet.Config{
Database: chanDB,
Database: remoteDB,
Notifier: cc.chainNotifier,
WalletController: wc,
Signer: cc.signer,

@ -126,6 +126,8 @@ var (
// estimatesmartfee RPC call.
defaultBitcoindEstimateMode = "CONSERVATIVE"
bitcoindEstimateModes = [2]string{"ECONOMICAL", defaultBitcoindEstimateMode}
defaultSphinxDbName = "sphinxreplay.db"
)
// Config defines the configuration options for lnd.

@ -48,18 +48,50 @@ func (db *DB) Validate() error {
return nil
}
// GetBackend returns a kvdb.Backend as set in the DB config.
func (db *DB) GetBackend(ctx context.Context, dbPath string,
networkName string) (kvdb.Backend, error) {
// DatabaseBackends is a two-tuple that holds the set of active database
// backends for the daemon. The two backends we expose are the local database
// backend, and the remote backend. The LocalDB attribute will always be
// populated. However, the remote DB will only be set if a replicated database
// is active.
type DatabaseBackends struct {
// LocalDB points to the local non-replicated backend.
LocalDB kvdb.Backend
// RemoteDB points to a possibly networked replicated backend. If no
// replicated backend is active, then this pointer will be nil.
RemoteDB kvdb.Backend
}
// GetBackends returns a set of kvdb.Backends as set in the DB config. The
// local database will ALWAYS be non-nil, while the remote database will only
// be populated if etcd is specified.
func (db *DB) GetBackends(ctx context.Context, dbPath string,
networkName string) (*DatabaseBackends, error) {
var (
localDB, remoteDB kvdb.Backend
err error
)
if db.Backend == EtcdBackend {
// Prefix will separate key/values in the db.
return kvdb.GetEtcdBackend(ctx, networkName, db.Etcd)
remoteDB, err = kvdb.GetEtcdBackend(ctx, networkName, db.Etcd)
if err != nil {
return nil, err
}
}
// The implementation by walletdb accepts "noFreelistSync" as the
// second parameter, so we negate here.
return kvdb.GetBoltBackend(dbPath, dbName, !db.Bolt.SyncFreelist)
localDB, err = kvdb.GetBoltBackend(
dbPath, dbName, !db.Bolt.SyncFreelist,
)
if err != nil {
return nil, err
}
return &DatabaseBackends{
LocalDB: localDB,
RemoteDB: remoteDB,
}, nil
}
// Compile-time constraint to ensure Workers implements the Validator interface.

169
lnd.go

@ -246,49 +246,20 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
defer pprof.StopCPUProfile()
}
// Create the network-segmented directory for the channel database.
ltndLog.Infof("Opening the main database, this might take a few " +
"minutes...")
startOpenTime := time.Now()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if cfg.DB.Backend == lncfg.BoltBackend {
ltndLog.Infof("Opening bbolt database, sync_freelist=%v",
cfg.DB.Bolt.SyncFreelist)
}
chanDbBackend, err := cfg.DB.GetBackend(ctx,
cfg.localDatabaseDir(), cfg.networkName(),
)
if err != nil {
ltndLog.Error(err)
return err
}
// Open the channeldb, which is dedicated to storing channel, and
// network related metadata.
chanDB, err := channeldb.CreateWithBackend(
chanDbBackend,
channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize),
channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize),
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
)
localChanDB, remoteChanDB, cleanUp, err := initializeDatabases(ctx, cfg)
switch {
case err == channeldb.ErrDryRunMigrationOK:
ltndLog.Infof("%v, exiting", err)
return nil
case err != nil:
ltndLog.Errorf("Unable to open channeldb: %v", err)
return err
return fmt.Errorf("unable to open databases: %v", err)
}
defer chanDB.Close()
openTime := time.Since(startOpenTime)
ltndLog.Infof("Database now open (time_to_open=%v)!", openTime)
defer cleanUp()
// Only process macaroons if --no-macaroons isn't set.
tlsCfg, restCreds, restProxyDest, err := getTLSConfig(cfg)
@ -460,8 +431,12 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
// With the information parsed from the configuration, create valid
// instances of the pertinent interfaces required to operate the
// Lightning Network Daemon.
//
// When we create the chain control, we need storage for the height
// hints and also the wallet itself, for these two we want them to be
// replicated, so we'll pass in the remote channel DB instance.
activeChainControl, err := newChainControlFromConfig(
cfg, chanDB, privateWalletPw, publicWalletPw,
cfg, localChanDB, remoteChanDB, privateWalletPw, publicWalletPw,
walletInitParams.Birthday, walletInitParams.RecoveryWindow,
walletInitParams.Wallet, neutrinoCS,
)
@ -616,9 +591,9 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
// Set up the core server which will listen for incoming peer
// connections.
server, err := newServer(
cfg, cfg.Listeners, chanDB, towerClientDB, activeChainControl,
&idKeyDesc, walletInitParams.ChansToRestore, chainedAcceptor,
torController,
cfg, cfg.Listeners, localChanDB, remoteChanDB, towerClientDB,
activeChainControl, &idKeyDesc, walletInitParams.ChansToRestore,
chainedAcceptor, torController,
)
if err != nil {
err := fmt.Errorf("unable to create server: %v", err)
@ -1139,3 +1114,125 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr,
return nil, fmt.Errorf("shutting down")
}
}
// initializeDatabases extracts the current databases that we'll use for normal
// operation in the daemon. Two databases are returned: one remote and one
// local. However, only if the replicated database is active will the remote
// database point to a unique database. Otherwise, the local and remote DB will
// both point to the same local database. A function closure that closes all
// opened databases is also returned.
func initializeDatabases(ctx context.Context,
cfg *Config) (*channeldb.DB, *channeldb.DB, func(), error) {
ltndLog.Infof("Opening the main database, this might take a few " +
"minutes...")
if cfg.DB.Backend == lncfg.BoltBackend {
ltndLog.Infof("Opening bbolt database, sync_freelist=%v",
cfg.DB.Bolt.SyncFreelist)
}
startOpenTime := time.Now()
databaseBackends, err := cfg.DB.GetBackends(
ctx, cfg.localDatabaseDir(), cfg.networkName(),
)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to obtain database "+
"backends: %v", err)
}
// If the remoteDB is nil, then we'll just open a local DB as normal,
// having the remote and local pointer be the exact same instance.
var (
localChanDB, remoteChanDB *channeldb.DB
closeFuncs []func()
)
if databaseBackends.RemoteDB == nil {
// Open the channeldb, which is dedicated to storing channel,
// and network related metadata.
localChanDB, err = channeldb.CreateWithBackend(
databaseBackends.LocalDB,
channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize),
channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize),
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
)
switch {
case err == channeldb.ErrDryRunMigrationOK:
return nil, nil, nil, err
case err != nil:
err := fmt.Errorf("unable to open local channeldb: %v", err)
ltndLog.Error(err)
return nil, nil, nil, err
}
closeFuncs = append(closeFuncs, func() {
localChanDB.Close()
})
remoteChanDB = localChanDB
} else {
ltndLog.Infof("Database replication is available! Creating " +
"local and remote channeldb instances")
// Otherwise, we'll open two instances, one for the state we
// only need locally, and the other for things we want to
// ensure are replicated.
localChanDB, err = channeldb.CreateWithBackend(
databaseBackends.LocalDB,
channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize),
channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize),
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
)
switch {
// As we want to allow both versions to get thru the dry run
// migration, we'll only exit the second time here once the
// remote instance has had a time to migrate as well.
case err == channeldb.ErrDryRunMigrationOK:
ltndLog.Infof("Local DB dry run migration successful")
case err != nil:
err := fmt.Errorf("unable to open local channeldb: %v", err)
ltndLog.Error(err)
return nil, nil, nil, err
}
closeFuncs = append(closeFuncs, func() {
localChanDB.Close()
})
ltndLog.Infof("Opening replicated database instance...")
remoteChanDB, err = channeldb.CreateWithBackend(
databaseBackends.RemoteDB,
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
)
switch {
case err == channeldb.ErrDryRunMigrationOK:
return nil, nil, nil, err
case err != nil:
localChanDB.Close()
err := fmt.Errorf("unable to open remote channeldb: %v", err)
ltndLog.Error(err)
return nil, nil, nil, err
}
closeFuncs = append(closeFuncs, func() {
remoteChanDB.Close()
})
}
openTime := time.Since(startOpenTime)
ltndLog.Infof("Database now open (time_to_open=%v)!", openTime)
cleanUp := func() {
for _, closeFunc := range closeFuncs {
closeFunc()
}
}
return localChanDB, remoteChanDB, cleanUp, nil
}

@ -456,7 +456,7 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
// Before we register this new link with the HTLC Switch, we'll
// need to fetch its current link-layer forwarding policy from
// the database.
graph := p.cfg.ChannelDB.ChannelGraph()
graph := p.cfg.ChannelGraph
info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint)
if err != nil && err != channeldb.ErrEdgeNotFound {
return nil, err

@ -90,10 +90,13 @@ type Config struct {
// ChannelLinkConfig.
InterceptSwitch *htlcswitch.InterceptableSwitch
// ChannelDB is used to fetch opened channels, closed channels, and the
// channel graph.
// ChannelDB is used to fetch opened channels, and closed channels.
ChannelDB *channeldb.DB
// ChannelGraph is a pointer to the channel graph which is used to
// query information about the set of known active channels.
ChannelGraph *channeldb.ChannelGraph
// ChainArb is used to subscribe to channel events, update contract signals,
// and force close channels.
ChainArb *contractcourt.ChainArbitrator

@ -185,7 +185,7 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot,
WalletBalance: func() (btcutil.Amount, error) {
return svr.cc.wallet.ConfirmedBalance(cfg.MinConfs)
},
Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()),
Graph: autopilot.ChannelGraphFromDatabase(svr.localChanDB.ChannelGraph()),
Constraints: atplConstraints,
ConnectToPeer: func(target *btcec.PublicKey, addrs []net.Addr) (bool, error) {
// First, we'll check if we're already connected to the
@ -256,7 +256,7 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot,
// We'll fetch the current state of open
// channels from the database to use as initial
// state for the auto-pilot agent.
activeChannels, err := svr.chanDB.FetchAllChannels()
activeChannels, err := svr.remoteChanDB.FetchAllChannels()
if err != nil {
return nil, err
}

@ -522,12 +522,12 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service,
chanPredicate *chanacceptor.ChainedAcceptor) (*rpcServer, error) {
// Set up router rpc backend.
channelGraph := s.chanDB.ChannelGraph()
channelGraph := s.localChanDB.ChannelGraph()
selfNode, err := channelGraph.SourceNode()
if err != nil {
return nil, err
}
graph := s.chanDB.ChannelGraph()
graph := s.localChanDB.ChannelGraph()
routerBackend := &routerrpc.RouterBackend{
SelfNode: selfNode.PubKeyBytes,
FetchChannelCapacity: func(chanID uint64) (btcutil.Amount,
@ -576,10 +576,12 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service,
// Before we create any of the sub-servers, we need to ensure that all
// the dependencies they need are properly populated within each sub
// server configuration struct.
//
// TODO(roasbeef): extend sub-sever config to have both (local vs remote) DB
err = subServerCgs.PopulateDependencies(
cfg, s.cc, cfg.networkDir, macService, atpl, invoiceRegistry,
s.htlcSwitch, activeNetParams.Params, s.chanRouter,
routerBackend, s.nodeSigner, s.chanDB, s.sweeper, tower,
routerBackend, s.nodeSigner, s.remoteChanDB, s.sweeper, tower,
s.towerClient, cfg.net.ResolveTCPAddr, genInvoiceFeatures,
rpcsLog,
)
@ -1399,7 +1401,7 @@ func (r *rpcServer) VerifyMessage(ctx context.Context,
// channels signed the message.
//
// TODO(phlip9): Require valid nodes to have capital in active channels.
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
_, active, err := graph.HasLightningNode(pub)
if err != nil {
return nil, fmt.Errorf("failed to query graph: %v", err)
@ -1488,7 +1490,7 @@ func (r *rpcServer) DisconnectPeer(ctx context.Context,
// Next, we'll fetch the pending/active channels we have with a
// particular peer.
nodeChannels, err := r.server.chanDB.FetchOpenChannels(peerPubKey)
nodeChannels, err := r.server.remoteChanDB.FetchOpenChannels(peerPubKey)
if err != nil {
return nil, fmt.Errorf("unable to fetch channels for peer: %v", err)
}
@ -2065,7 +2067,7 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
// First, we'll fetch the channel as is, as we'll need to examine it
// regardless of if this is a force close or not.
channel, err := r.server.chanDB.FetchChannel(*chanPoint)
channel, err := r.server.remoteChanDB.FetchChannel(*chanPoint)
if err != nil {
return err
}
@ -2329,7 +2331,7 @@ func (r *rpcServer) AbandonChannel(ctx context.Context,
return nil, err
}
dbChan, err := r.server.chanDB.FetchChannel(*chanPoint)
dbChan, err := r.server.remoteChanDB.FetchChannel(*chanPoint)
switch {
// If the channel isn't found in the set of open channels, then we can
// continue on as it can't be loaded into the link/peer.
@ -2360,12 +2362,12 @@ func (r *rpcServer) AbandonChannel(ctx context.Context,
// court. Between any step it's possible that the users restarts the
// process all over again. As a result, each of the steps below are
// intended to be idempotent.
err = r.server.chanDB.AbandonChannel(chanPoint, uint32(bestHeight))
err = r.server.remoteChanDB.AbandonChannel(chanPoint, uint32(bestHeight))
if err != nil {
return nil, err
}
err = abandonChanFromGraph(
r.server.chanDB.ChannelGraph(), chanPoint,
r.server.localChanDB.ChannelGraph(), chanPoint,
)
if err != nil {
return nil, err
@ -2399,7 +2401,7 @@ func (r *rpcServer) GetInfo(ctx context.Context,
serverPeers := r.server.Peers()
openChannels, err := r.server.chanDB.FetchAllOpenChannels()
openChannels, err := r.server.remoteChanDB.FetchAllOpenChannels()
if err != nil {
return nil, err
}
@ -2414,7 +2416,7 @@ func (r *rpcServer) GetInfo(ctx context.Context,
inactiveChannels := uint32(len(openChannels)) - activeChannels
pendingChannels, err := r.server.chanDB.FetchPendingChannels()
pendingChannels, err := r.server.remoteChanDB.FetchPendingChannels()
if err != nil {
return nil, fmt.Errorf("unable to get retrieve pending "+
"channels: %v", err)
@ -2713,7 +2715,7 @@ func (r *rpcServer) WalletBalance(ctx context.Context,
func (r *rpcServer) ChannelBalance(ctx context.Context,
in *lnrpc.ChannelBalanceRequest) (*lnrpc.ChannelBalanceResponse, error) {
openChannels, err := r.server.chanDB.FetchAllOpenChannels()
openChannels, err := r.server.remoteChanDB.FetchAllOpenChannels()
if err != nil {
return nil, err
}
@ -2723,7 +2725,7 @@ func (r *rpcServer) ChannelBalance(ctx context.Context,
balance += channel.LocalCommitment.LocalBalance.ToSatoshis()
}
pendingChannels, err := r.server.chanDB.FetchPendingChannels()
pendingChannels, err := r.server.remoteChanDB.FetchPendingChannels()
if err != nil {
return nil, err
}
@ -2766,7 +2768,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
// First, we'll populate the response with all the channels that are
// soon to be opened. We can easily fetch this data from the database
// and map the db struct to the proto response.
pendingOpenChannels, err := r.server.chanDB.FetchPendingChannels()
pendingOpenChannels, err := r.server.remoteChanDB.FetchPendingChannels()
if err != nil {
rpcsLog.Errorf("unable to fetch pending channels: %v", err)
return nil, err
@ -2814,7 +2816,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
// Next, we'll examine the channels that are soon to be closed so we
// can populate these fields within the response.
pendingCloseChannels, err := r.server.chanDB.FetchClosedChannels(true)
pendingCloseChannels, err := r.server.remoteChanDB.FetchClosedChannels(true)
if err != nil {
rpcsLog.Errorf("unable to fetch closed channels: %v", err)
return nil, err
@ -2843,7 +2845,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
// not found, or the channel itself, this channel was closed
// in a version before we started persisting historical
// channels, so we silence the error.
historical, err := r.server.chanDB.FetchHistoricalChannel(
historical, err := r.server.remoteChanDB.FetchHistoricalChannel(
&pendingClose.ChanPoint,
)
switch err {
@ -2918,7 +2920,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
// We'll also fetch all channels that are open, but have had their
// commitment broadcasted, meaning they are waiting for the closing
// transaction to confirm.
waitingCloseChans, err := r.server.chanDB.FetchWaitingCloseChannels()
waitingCloseChans, err := r.server.remoteChanDB.FetchWaitingCloseChannels()
if err != nil {
rpcsLog.Errorf("unable to fetch channels waiting close: %v",
err)
@ -3153,7 +3155,7 @@ func (r *rpcServer) ClosedChannels(ctx context.Context,
resp := &lnrpc.ClosedChannelsResponse{}
dbChannels, err := r.server.chanDB.FetchClosedChannels(false)
dbChannels, err := r.server.remoteChanDB.FetchClosedChannels(false)
if err != nil {
return nil, err
}
@ -3230,9 +3232,9 @@ func (r *rpcServer) ListChannels(ctx context.Context,
resp := &lnrpc.ListChannelsResponse{}
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
dbChannels, err := r.server.chanDB.FetchAllOpenChannels()
dbChannels, err := r.server.remoteChanDB.FetchAllOpenChannels()
if err != nil {
return nil, err
}
@ -3549,7 +3551,8 @@ func (r *rpcServer) createRPCClosedChannel(
CloseInitiator: closeInitiator,
}
reports, err := r.server.chanDB.FetchChannelReports(
reports, err := r.server.remoteChanDB.FetchChannelReports(
*activeNetParams.GenesisHash, &dbChannel.ChanPoint,
)
switch err {
@ -3654,7 +3657,7 @@ func (r *rpcServer) getInitiators(chanPoint *wire.OutPoint) (
// To get the close initiator for cooperative closes, we need
// to get the channel status from the historical channel bucket.
histChan, err := r.server.chanDB.FetchHistoricalChannel(chanPoint)
histChan, err := r.server.remoteChanDB.FetchHistoricalChannel(chanPoint)
switch {
// The node has upgraded from a version where we did not store
// historical channels, and has not closed a channel since. Do
@ -3718,7 +3721,7 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription,
// the server, or client exits.
defer channelEventSub.Cancel()
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
for {
select {
@ -4526,7 +4529,7 @@ func (r *rpcServer) AddInvoice(ctx context.Context,
ChainParams: activeNetParams.Params,
NodeSigner: r.server.nodeSigner,
DefaultCLTVExpiry: defaultDelta,
ChanDB: r.server.chanDB,
ChanDB: r.server.remoteChanDB,
GenInvoiceFeatures: func() *lnwire.FeatureVector {
return r.server.featureMgr.Get(feature.SetInvoice)
},
@ -4640,7 +4643,7 @@ func (r *rpcServer) ListInvoices(ctx context.Context,
PendingOnly: req.PendingOnly,
Reversed: req.Reversed,
}
invoiceSlice, err := r.server.chanDB.QueryInvoices(q)
invoiceSlice, err := r.server.remoteChanDB.QueryInvoices(q)
if err != nil {
return nil, fmt.Errorf("unable to query invoices: %v", err)
}
@ -4806,7 +4809,7 @@ func (r *rpcServer) DescribeGraph(ctx context.Context,
// Obtain the pointer to the global singleton channel graph, this will
// provide a consistent view of the graph due to bolt db's
// transactional model.
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
// First iterate through all the known nodes (connected or unconnected
// within the graph), collating their current state into the RPC
@ -4944,7 +4947,7 @@ func (r *rpcServer) GetNodeMetrics(ctx context.Context,
// Obtain the pointer to the global singleton channel graph, this will
// provide a consistent view of the graph due to bolt db's
// transactional model.
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
// Calculate betweenness centrality if requested. Note that depending on the
// graph size, this may take up to a few minutes.
@ -4983,7 +4986,7 @@ func (r *rpcServer) GetNodeMetrics(ctx context.Context,
func (r *rpcServer) GetChanInfo(ctx context.Context,
in *lnrpc.ChanInfoRequest) (*lnrpc.ChannelEdge, error) {
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
edgeInfo, edge1, edge2, err := graph.FetchChannelEdgesByID(in.ChanId)
if err != nil {
@ -5003,7 +5006,7 @@ func (r *rpcServer) GetChanInfo(ctx context.Context,
func (r *rpcServer) GetNodeInfo(ctx context.Context,
in *lnrpc.NodeInfoRequest) (*lnrpc.NodeInfo, error) {
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
// First, parse the hex-encoded public key into a full in-memory public
// key object we can work with for querying.
@ -5101,7 +5104,7 @@ func (r *rpcServer) QueryRoutes(ctx context.Context,
func (r *rpcServer) GetNetworkInfo(ctx context.Context,
_ *lnrpc.NetworkInfoRequest) (*lnrpc.NetworkInfo, error) {
graph := r.server.chanDB.ChannelGraph()
graph := r.server.localChanDB.ChannelGraph()
var (
numNodes uint32
@ -5381,7 +5384,7 @@ func (r *rpcServer) ListPayments(ctx context.Context,
query.MaxPayments = math.MaxUint64
}
paymentsQuerySlice, err := r.server.chanDB.QueryPayments(query)
paymentsQuerySlice, err := r.server.remoteChanDB.QueryPayments(query)
if err != nil {
return nil, err
}
@ -5413,7 +5416,7 @@ func (r *rpcServer) DeleteAllPayments(ctx context.Context,
rpcsLog.Debugf("[DeleteAllPayments]")
if err := r.server.chanDB.DeletePayments(); err != nil {
if err := r.server.remoteChanDB.DeletePayments(); err != nil {
return nil, err
}
@ -5533,7 +5536,7 @@ func (r *rpcServer) FeeReport(ctx context.Context,
rpcsLog.Debugf("[feereport]")
channelGraph := r.server.chanDB.ChannelGraph()
channelGraph := r.server.localChanDB.ChannelGraph()
selfNode, err := channelGraph.SourceNode()
if err != nil {
return nil, err
@ -5572,7 +5575,7 @@ func (r *rpcServer) FeeReport(ctx context.Context,
return nil, err
}
fwdEventLog := r.server.chanDB.ForwardingLog()
fwdEventLog := r.server.remoteChanDB.ForwardingLog()
// computeFeeSum is a helper function that computes the total fees for
// a particular time slice described by a forwarding event query.
@ -5810,7 +5813,7 @@ func (r *rpcServer) ForwardingHistory(ctx context.Context,
IndexOffset: req.IndexOffset,
NumMaxEvents: numEvents,
}
timeSlice, err := r.server.chanDB.ForwardingLog().Query(eventQuery)
timeSlice, err := r.server.remoteChanDB.ForwardingLog().Query(eventQuery)
if err != nil {
return nil, fmt.Errorf("unable to query forwarding log: %v", err)
}
@ -5871,7 +5874,7 @@ func (r *rpcServer) ExportChannelBackup(ctx context.Context,
// the database. If this channel has been closed, or the outpoint is
// unknown, then we'll return an error
unpackedBackup, err := chanbackup.FetchBackupForChan(
chanPoint, r.server.chanDB,
chanPoint, r.server.remoteChanDB,
)
if err != nil {
return nil, err
@ -6041,7 +6044,7 @@ func (r *rpcServer) ExportAllChannelBackups(ctx context.Context,
// First, we'll attempt to read back ups for ALL currently opened
// channels from disk.
allUnpackedBackups, err := chanbackup.FetchStaticChanBackups(
r.server.chanDB,
r.server.remoteChanDB,
)
if err != nil {
return nil, fmt.Errorf("unable to fetch all static chan "+
@ -6064,7 +6067,7 @@ func (r *rpcServer) RestoreChannelBackups(ctx context.Context,
// restore either a set of chanbackup.Single or chanbackup.Multi
// backups.
chanRestorer := &chanDBRestorer{
db: r.server.chanDB,
db: r.server.remoteChanDB,
secretKeys: r.server.cc.keyRing,
chainArb: r.server.chainArb,
}
@ -6162,7 +6165,7 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription
// we'll obtains the current set of single channel
// backups from disk.
chanBackups, err := chanbackup.FetchStaticChanBackups(
r.server.chanDB,
r.server.remoteChanDB,
)
if err != nil {
return fmt.Errorf("unable to fetch all "+

@ -204,7 +204,9 @@ type server struct {
fundingMgr *fundingManager
chanDB *channeldb.DB
localChanDB *channeldb.DB
remoteChanDB *channeldb.DB
htlcSwitch *htlcswitch.Switch
@ -328,7 +330,8 @@ func noiseDial(idKey keychain.SingleKeyECDH,
// newServer creates a new instance of the server which is to listen using the
// passed listener address.
func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
func newServer(cfg *Config, listenAddrs []net.Addr,
localChanDB, remoteChanDB *channeldb.DB,
towerClientDB *wtdb.ClientDB, cc *chainControl,
nodeKeyDesc *keychain.KeyDescriptor,
chansToRestore walletunlocker.ChannelsToRecover,
@ -360,8 +363,11 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
copy(serializedPubKey[:], nodeKeyECDH.PubKey().SerializeCompressed())
// Initialize the sphinx router, placing it's persistent replay log in
// the same directory as the channel graph database.
sharedSecretPath := filepath.Join(cfg.localDatabaseDir(), "sphinxreplay.db")
// the same directory as the channel graph database. We don't need to
// replicate this data, so we'll store it locally.
sharedSecretPath := filepath.Join(
cfg.localDatabaseDir(), defaultSphinxDbName,
)
replayLog := htlcswitch.NewDecayedLog(sharedSecretPath, cc.chainNotifier)
sphinxRouter := sphinx.NewRouter(
nodeKeyECDH, activeNetParams.Params, replayLog,
@ -405,7 +411,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
s := &server{
cfg: cfg,
chanDB: chanDB,
localChanDB: localChanDB,
remoteChanDB: remoteChanDB,
cc: cc,
sigPool: lnwallet.NewSigPool(cfg.Workers.Sig, cc.signer),
writePool: writePool,
@ -413,11 +420,11 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
chansToRestore: chansToRestore,
invoices: invoices.NewRegistry(
chanDB, invoices.NewInvoiceExpiryWatcher(clock.NewDefaultClock()),
remoteChanDB, invoices.NewInvoiceExpiryWatcher(clock.NewDefaultClock()),
&registryConfig,
),
channelNotifier: channelnotifier.New(chanDB),
channelNotifier: channelnotifier.New(remoteChanDB),
identityECDH: nodeKeyECDH,
nodeSigner: netann.NewNodeSigner(nodeKeySigner),
@ -449,7 +456,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
}
s.witnessBeacon = &preimageBeacon{
wCache: chanDB.NewWitnessCache(),
wCache: remoteChanDB.NewWitnessCache(),
subscribers: make(map[uint64]*preimageSubscriber),
}
@ -461,7 +468,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now)
s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{
DB: chanDB,
DB: remoteChanDB,
LocalChannelClose: func(pubKey []byte,
request *htlcswitch.ChanClose) {
@ -476,7 +483,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
peer.HandleLocalCloseChanReqs(request)
},
FwdingLog: chanDB.ForwardingLog(),
FwdingLog: remoteChanDB.ForwardingLog(),
SwitchPackager: channeldb.NewSwitchPackager(),
ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter,
FetchLastChannelUpdate: s.fetchLastChanUpdate(),
@ -503,8 +510,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
MessageSigner: s.nodeSigner,
IsChannelActive: s.htlcSwitch.HasActiveLink,
ApplyChannelUpdate: s.applyChannelUpdate,
DB: chanDB,
Graph: chanDB.ChannelGraph(),
DB: remoteChanDB,
Graph: localChanDB.ChannelGraph(),
}
chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg)
@ -594,7 +601,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
selfAddrs := make([]net.Addr, 0, len(externalIPs))
selfAddrs = append(selfAddrs, externalIPs...)
chanGraph := chanDB.ChannelGraph()
// As the graph can be obtained at anytime from the network, we won't
// replicate it, and instead it'll only be stored locally.
chanGraph := localChanDB.ChannelGraph()
// We'll now reconstruct a node announcement based on our current
// configuration so we can send it out as a sort of heart beat within
@ -661,7 +670,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
// The router will get access to the payment ID sequencer, such that it
// can generate unique payment IDs.
sequencer, err := htlcswitch.NewPersistentSequencer(chanDB)
sequencer, err := htlcswitch.NewPersistentSequencer(remoteChanDB)
if err != nil {
return nil, err
}
@ -694,7 +703,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC)
s.missionControl, err = routing.NewMissionControl(
chanDB,
remoteChanDB,
&routing.MissionControlConfig{
AprioriHopProbability: routingConfig.AprioriHopProbability,
PenaltyHalfLife: routingConfig.PenaltyHalfLife,
@ -727,7 +736,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
PathFindingConfig: pathFindingConfig,
}
paymentControl := channeldb.NewPaymentControl(chanDB)
paymentControl := channeldb.NewPaymentControl(remoteChanDB)
s.controlTower = routing.NewControlTower(paymentControl)
@ -751,12 +760,12 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
return nil, fmt.Errorf("can't create router: %v", err)
}
chanSeries := discovery.NewChanSeries(s.chanDB.ChannelGraph())
gossipMessageStore, err := discovery.NewMessageStore(s.chanDB)
chanSeries := discovery.NewChanSeries(s.localChanDB.ChannelGraph())
gossipMessageStore, err := discovery.NewMessageStore(s.remoteChanDB)
if err != nil {
return nil, err
}
waitingProofStore, err := channeldb.NewWaitingProofStore(s.chanDB)
waitingProofStore, err := channeldb.NewWaitingProofStore(s.remoteChanDB)
if err != nil {
return nil, err
}
@ -793,10 +802,12 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
ForAllOutgoingChannels: s.chanRouter.ForAllOutgoingChannels,
PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate,
UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies,
FetchChannel: s.chanDB.FetchChannel,
FetchChannel: s.remoteChanDB.FetchChannel,
}
utxnStore, err := newNurseryStore(activeNetParams.GenesisHash, chanDB)
utxnStore, err := newNurseryStore(
activeNetParams.GenesisHash, remoteChanDB,
)
if err != nil {
srvrLog.Errorf("unable to create nursery store: %v", err)
return nil, err
@ -806,7 +817,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
sweep.DefaultBatchWindowDuration)
sweeperStore, err := sweep.NewSweeperStore(
chanDB, activeNetParams.GenesisHash,
remoteChanDB, activeNetParams.GenesisHash,
)
if err != nil {
srvrLog.Errorf("unable to create sweeper store: %v", err)
@ -833,8 +844,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
s.utxoNursery = newUtxoNursery(&NurseryConfig{
ChainIO: cc.chainIO,
ConfDepth: 1,
FetchClosedChannels: chanDB.FetchClosedChannels,
FetchClosedChannel: chanDB.FetchClosedChannel,
FetchClosedChannels: remoteChanDB.FetchClosedChannels,
FetchClosedChannel: remoteChanDB.FetchClosedChannel,
Notifier: cc.chainNotifier,
PublishTransaction: cc.wallet.PublishTransaction,
Store: utxnStore,
@ -935,18 +946,18 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod,
IsForwardedHTLC: s.htlcSwitch.IsForwardedHTLC,
Clock: clock.NewDefaultClock(),
}, chanDB)
}, remoteChanDB)
s.breachArbiter = newBreachArbiter(&BreachConfig{
CloseLink: closeLink,
DB: chanDB,
DB: remoteChanDB,
Estimator: s.cc.feeEstimator,
GenSweepScript: newSweepPkScriptGen(cc.wallet),
Notifier: cc.chainNotifier,
PublishTransaction: cc.wallet.PublishTransaction,
ContractBreaches: contractBreaches,
Signer: cc.wallet.Cfg.Signer,
Store: newRetributionStore(chanDB),
Store: newRetributionStore(remoteChanDB),
})
// Select the configuration and furnding parameters for Bitcoin or
@ -997,7 +1008,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
FindChannel: func(chanID lnwire.ChannelID) (
*channeldb.OpenChannel, error) {
dbChannels, err := chanDB.FetchAllChannels()
dbChannels, err := remoteChanDB.FetchAllChannels()
if err != nil {
return nil, err
}
@ -1161,10 +1172,10 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
// static backup of the latest channel state.
chanNotifier := &channelNotifier{
chanNotifier: s.channelNotifier,
addrs: s.chanDB,
addrs: s.remoteChanDB,
}
backupFile := chanbackup.NewMultiFile(cfg.BackupFilePath)
startingChans, err := chanbackup.FetchStaticChanBackups(s.chanDB)
startingChans, err := chanbackup.FetchStaticChanBackups(s.remoteChanDB)
if err != nil {
return nil, err
}
@ -1183,7 +1194,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{
SubscribeChannelEvents: s.channelNotifier.SubscribeChannelEvents,
SubscribePeerEvents: s.peerNotifier.SubscribePeerEvents,
GetOpenChannels: s.chanDB.FetchAllOpenChannels,
GetOpenChannels: s.remoteChanDB.FetchAllOpenChannels,
})
if cfg.WtClient.Active {
@ -1384,7 +1395,7 @@ func (s *server) Start() error {
// that have all the information we need to handle channel
// recovery _before_ we even accept connections from any peers.
chanRestorer := &chanDBRestorer{
db: s.chanDB,
db: s.remoteChanDB,
secretKeys: s.cc.keyRing,
chainArb: s.chainArb,
}
@ -1424,7 +1435,7 @@ func (s *server) Start() error {
// we'll prune our set of link nodes found within the database
// to ensure we don't reconnect to any nodes we no longer have
// open channels with.
if err := s.chanDB.PruneLinkNodes(); err != nil {
if err := s.remoteChanDB.PruneLinkNodes(); err != nil {
startErr = err
return
}
@ -1730,7 +1741,7 @@ func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, e
// First, we'll create an instance of the ChannelGraphBootstrapper as
// this can be used by default if we've already partially seeded the
// network.
chanGraph := autopilot.ChannelGraphFromDatabase(s.chanDB.ChannelGraph())
chanGraph := autopilot.ChannelGraphFromDatabase(s.localChanDB.ChannelGraph())
graphBootstrapper, err := discovery.NewGraphBootstrapper(chanGraph)
if err != nil {
return nil, err
@ -2062,7 +2073,7 @@ func (s *server) createNewHiddenService() error {
AuthSigBytes: newNodeAnn.Signature.ToSignatureBytes(),
}
copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed())
if err := s.chanDB.ChannelGraph().SetSourceNode(selfNode); err != nil {
if err := s.localChanDB.ChannelGraph().SetSourceNode(selfNode); err != nil {
return fmt.Errorf("can't set self node: %v", err)
}
@ -2119,7 +2130,7 @@ func (s *server) establishPersistentConnections() error {
// Iterate through the list of LinkNodes to find addresses we should
// attempt to connect to based on our set of previous connections. Set
// the reconnection port to the default peer port.
linkNodes, err := s.chanDB.FetchAllLinkNodes()
linkNodes, err := s.remoteChanDB.FetchAllLinkNodes()
if err != nil && err != channeldb.ErrLinkNodesNotFound {
return err
}
@ -2135,7 +2146,7 @@ func (s *server) establishPersistentConnections() error {
// After checking our previous connections for addresses to connect to,
// iterate through the nodes in our channel graph to find addresses
// that have been added via NodeAnnouncement messages.
chanGraph := s.chanDB.ChannelGraph()
chanGraph := s.localChanDB.ChannelGraph()
sourceNode, err := chanGraph.SourceNode()
if err != nil {
return err
@ -2842,7 +2853,8 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
ReadPool: s.readPool,
Switch: s.htlcSwitch,
InterceptSwitch: s.interceptableSwitch,
ChannelDB: s.chanDB,
ChannelDB: s.localChanDB,
ChannelGraph: s.remoteChanDB.ChannelGraph(),
ChainArb: s.chainArb,
AuthGossiper: s.authGossiper,
ChanStatusMgr: s.chanStatusMgr,
@ -3543,7 +3555,7 @@ func (s *server) fetchNodeAdvertisedAddr(pub *btcec.PublicKey) (net.Addr, error)
return nil, err
}
node, err := s.chanDB.ChannelGraph().FetchLightningNode(nil, vertex)
node, err := s.localChanDB.ChannelGraph().FetchLightningNode(nil, vertex)
if err != nil {
return nil, err
}