lnd: export Manager, NewFundingManager

This commit is contained in:
eugene 2020-12-15 11:41:43 -05:00
parent f43fd32daf
commit 6a209bbdaf
3 changed files with 49 additions and 49 deletions

@ -327,7 +327,7 @@ type fundingConfig struct {
MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
}
// fundingManager acts as an orchestrator/bridge between the wallet's
// Manager acts as an orchestrator/bridge between the wallet's
// 'ChannelReservation' workflow, and the wire protocol's funding initiation
// messages. Any requests to initiate the funding workflow for a channel,
// either kicked-off locally or remotely are handled by the funding manager.
@ -336,7 +336,7 @@ type fundingConfig struct {
// the channel workflow. Additionally, any temporary or permanent access
// controls between the wallet and remote peers are enforced via the funding
// manager.
type fundingManager struct {
type Manager struct {
started sync.Once
stopped sync.Once
@ -429,10 +429,10 @@ var (
ErrChannelNotFound = fmt.Errorf("channel not found")
)
// newFundingManager creates and initializes a new instance of the
// NewFundingManager creates and initializes a new instance of the
// fundingManager.
func newFundingManager(cfg fundingConfig) (*fundingManager, error) {
return &fundingManager{
func NewFundingManager(cfg fundingConfig) (*Manager, error) {
return &Manager{
cfg: &cfg,
chanIDKey: cfg.TempChanIDSeed,
activeReservations: make(map[serializedPubKey]pendingChannels),
@ -448,7 +448,7 @@ func newFundingManager(cfg fundingConfig) (*fundingManager, error) {
// Start launches all helper goroutines required for handling requests sent
// to the funding manager.
func (f *fundingManager) Start() error {
func (f *Manager) Start() error {
var err error
f.started.Do(func() {
err = f.start()
@ -456,7 +456,7 @@ func (f *fundingManager) Start() error {
return err
}
func (f *fundingManager) start() error {
func (f *Manager) start() error {
fndgLog.Tracef("Funding manager running")
// Upon restart, the Funding Manager will check the database to load any
@ -550,7 +550,7 @@ func (f *fundingManager) start() error {
// Stop signals all helper goroutines to execute a graceful shutdown. This
// method will block until all goroutines have exited.
func (f *fundingManager) Stop() error {
func (f *Manager) Stop() error {
var err error
f.stopped.Do(func() {
err = f.stop()
@ -558,7 +558,7 @@ func (f *fundingManager) Stop() error {
return err
}
func (f *fundingManager) stop() error {
func (f *Manager) stop() error {
fndgLog.Infof("Funding manager shutting down")
close(f.quit)
@ -569,7 +569,7 @@ func (f *fundingManager) stop() error {
// nextPendingChanID returns the next free pending channel ID to be used to
// identify a particular future channel funding workflow.
func (f *fundingManager) nextPendingChanID() [32]byte {
func (f *Manager) nextPendingChanID() [32]byte {
// Obtain a fresh nonce. We do this by encoding the current nonce
// counter, then incrementing it by one.
f.nonceMtx.Lock()
@ -592,7 +592,7 @@ func (f *fundingManager) nextPendingChanID() [32]byte {
// CancelPeerReservations cancels all active reservations associated with the
// passed node. This will ensure any outputs which have been pre committed,
// (and thus locked from coin selection), are properly freed.
func (f *fundingManager) CancelPeerReservations(nodePub [33]byte) {
func (f *Manager) CancelPeerReservations(nodePub [33]byte) {
fndgLog.Debugf("Cancelling all reservations for peer %x", nodePub[:])
@ -632,7 +632,7 @@ func (f *fundingManager) CancelPeerReservations(nodePub [33]byte) {
//
// TODO(roasbeef): if peer disconnects, and haven't yet broadcast funding
// transaction, then all reservations should be cleared.
func (f *fundingManager) failFundingFlow(peer lnpeer.Peer, tempChanID [32]byte,
func (f *Manager) failFundingFlow(peer lnpeer.Peer, tempChanID [32]byte,
fundingErr error) {
fndgLog.Debugf("Failing funding flow for pending_id=%x: %v",
@ -684,7 +684,7 @@ func (f *fundingManager) failFundingFlow(peer lnpeer.Peer, tempChanID [32]byte,
// funding workflow between the wallet, and any outside peers or local callers.
//
// NOTE: This MUST be run as a goroutine.
func (f *fundingManager) reservationCoordinator() {
func (f *Manager) reservationCoordinator() {
defer f.wg.Done()
zombieSweepTicker := time.NewTicker(f.cfg.ZombieSweeperInterval)
@ -729,7 +729,7 @@ func (f *fundingManager) reservationCoordinator() {
// to get OpenStatusUpdates.
//
// NOTE: This MUST be run as a goroutine.
func (f *fundingManager) advanceFundingState(channel *channeldb.OpenChannel,
func (f *Manager) advanceFundingState(channel *channeldb.OpenChannel,
pendingChanID [32]byte, updateChan chan<- *lnrpc.OpenStatusUpdate) {
defer f.wg.Done()
@ -798,7 +798,7 @@ func (f *fundingManager) advanceFundingState(channel *channeldb.OpenChannel,
// machine. This method is synchronous and the new channel opening state will
// have been written to the database when it successfully returns. The
// updateChan can be set non-nil to get OpenStatusUpdates.
func (f *fundingManager) stateStep(channel *channeldb.OpenChannel,
func (f *Manager) stateStep(channel *channeldb.OpenChannel,
lnChannel *lnwallet.LightningChannel,
shortChanID *lnwire.ShortChannelID, pendingChanID [32]byte,
channelState channelOpeningState,
@ -923,7 +923,7 @@ func (f *fundingManager) stateStep(channel *channeldb.OpenChannel,
// advancePendingChannelState waits for a pending channel's funding tx to
// confirm, and marks it open in the database when that happens.
func (f *fundingManager) advancePendingChannelState(
func (f *Manager) advancePendingChannelState(
channel *channeldb.OpenChannel, pendingChanID [32]byte) error {
confChannel, err := f.waitForFundingWithTimeout(channel)
@ -1006,7 +1006,7 @@ func (f *fundingManager) advancePendingChannelState(
// ProcessFundingMsg sends a message to the internal fundingManager goroutine,
// allowing it to handle the lnwire.Message.
func (f *fundingManager) ProcessFundingMsg(msg lnwire.Message, peer lnpeer.Peer) {
func (f *Manager) ProcessFundingMsg(msg lnwire.Message, peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingMsg{msg, peer}:
case <-f.quit:
@ -1057,7 +1057,7 @@ func commitmentType(localFeatures,
//
// TODO(roasbeef): add error chan to all, let channelManager handle
// error+propagate
func (f *fundingManager) handleFundingOpen(peer lnpeer.Peer,
func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
msg *lnwire.OpenChannel) {
// Check number of pending channels to be smaller than maximum allowed
@ -1405,7 +1405,7 @@ func (f *fundingManager) handleFundingOpen(peer lnpeer.Peer,
// handleFundingAccept processes a response to the workflow initiation sent by
// the remote peer. This message then queues a message with the funding
// outpoint, and a commitment signature to the remote peer.
func (f *fundingManager) handleFundingAccept(peer lnpeer.Peer,
func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
msg *lnwire.AcceptChannel) {
pendingChanID := msg.PendingChannelID
@ -1569,7 +1569,7 @@ func (f *fundingManager) handleFundingAccept(peer lnpeer.Peer,
// is continued.
//
// NOTE: This method must be called as a goroutine.
func (f *fundingManager) waitForPsbt(intent *chanfunding.PsbtIntent,
func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent,
resCtx *reservationWithCtx, pendingChanID [32]byte) {
// failFlow is a helper that logs an error message with the current
@ -1639,7 +1639,7 @@ func (f *fundingManager) waitForPsbt(intent *chanfunding.PsbtIntent,
// continueFundingAccept continues the channel funding flow once our
// contribution is finalized, the channel output is known and the funding
// transaction is signed.
func (f *fundingManager) continueFundingAccept(resCtx *reservationWithCtx,
func (f *Manager) continueFundingAccept(resCtx *reservationWithCtx,
pendingChanID [32]byte) {
// Now that we have their contribution, we can extract, then send over
@ -1691,7 +1691,7 @@ func (f *fundingManager) continueFundingAccept(resCtx *reservationWithCtx,
// the responding side of a single funder workflow. Once this message has been
// processed, a signature is sent to the remote peer allowing it to broadcast
// the funding transaction, progressing the workflow into the final stage.
func (f *fundingManager) handleFundingCreated(peer lnpeer.Peer,
func (f *Manager) handleFundingCreated(peer lnpeer.Peer,
msg *lnwire.FundingCreated) {
peerKey := peer.IdentityKey()
@ -1845,7 +1845,7 @@ func (f *fundingManager) handleFundingCreated(peer lnpeer.Peer,
// broadcast. Once the funding transaction reaches a sufficient number of
// confirmations, a message is sent to the responding peer along with a compact
// encoding of the location of the channel within the blockchain.
func (f *fundingManager) handleFundingSigned(peer lnpeer.Peer,
func (f *Manager) handleFundingSigned(peer lnpeer.Peer,
msg *lnwire.FundingSigned) {
// As the funding signed message will reference the reservation by its
@ -2007,7 +2007,7 @@ type confirmedChannel struct {
// channel initiator and the maxWaitNumBlocksFundingConf has passed from the
// funding broadcast height. In case of confirmation, the short channel ID of
// the channel and the funding transaction will be returned.
func (f *fundingManager) waitForFundingWithTimeout(
func (f *Manager) waitForFundingWithTimeout(
ch *channeldb.OpenChannel) (*confirmedChannel, error) {
confChan := make(chan *confirmedChannel)
@ -2070,7 +2070,7 @@ func makeFundingScript(channel *channeldb.OpenChannel) ([]byte, error) {
// a *lnwire.ShortChannelID will be passed to confChan.
//
// NOTE: This MUST be run as a goroutine.
func (f *fundingManager) waitForFundingConfirmation(
func (f *Manager) waitForFundingConfirmation(
completeChan *channeldb.OpenChannel, cancelChan <-chan struct{},
confChan chan<- *confirmedChannel) {
@ -2161,7 +2161,7 @@ func (f *fundingManager) waitForFundingConfirmation(
//
// NOTE: timeoutChan MUST be buffered.
// NOTE: This MUST be run as a goroutine.
func (f *fundingManager) waitForTimeout(completeChan *channeldb.OpenChannel,
func (f *Manager) waitForTimeout(completeChan *channeldb.OpenChannel,
cancelChan <-chan struct{}, timeoutChan chan<- error) {
defer f.wg.Done()
@ -2217,7 +2217,7 @@ func (f *fundingManager) waitForTimeout(completeChan *channeldb.OpenChannel,
// the channelOpeningState markedOpen. In addition it will report the now
// decided short channel ID to the switch, and close the local discovery signal
// for this channel.
func (f *fundingManager) handleFundingConfirmation(
func (f *Manager) handleFundingConfirmation(
completeChan *channeldb.OpenChannel,
confChannel *confirmedChannel) error {
@ -2305,7 +2305,7 @@ func (f *fundingManager) handleFundingConfirmation(
// sendFundingLocked creates and sends the fundingLocked message.
// This should be called after the funding transaction has been confirmed,
// and the channelState is 'markedOpen'.
func (f *fundingManager) sendFundingLocked(
func (f *Manager) sendFundingLocked(
completeChan *channeldb.OpenChannel, channel *lnwallet.LightningChannel,
shortChanID *lnwire.ShortChannelID) error {
@ -2367,7 +2367,7 @@ func (f *fundingManager) sendFundingLocked(
// These announcement messages are NOT broadcasted to the greater network,
// only to the channel counter party. The proofs required to announce the
// channel to the greater network will be created and sent in annAfterSixConfs.
func (f *fundingManager) addToRouterGraph(completeChan *channeldb.OpenChannel,
func (f *Manager) addToRouterGraph(completeChan *channeldb.OpenChannel,
shortChanID *lnwire.ShortChannelID) error {
chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)
@ -2452,7 +2452,7 @@ func (f *fundingManager) addToRouterGraph(completeChan *channeldb.OpenChannel,
// 'addedToRouterGraph') and the channel is ready to be used. This is the last
// step in the channel opening process, and the opening state will be deleted
// from the database if successful.
func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
func (f *Manager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
shortChanID *lnwire.ShortChannelID) error {
// If this channel is not meant to be announced to the greater network,
@ -2575,7 +2575,7 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
// handleFundingLocked finalizes the channel funding process and enables the
// channel to enter normal operating mode.
func (f *fundingManager) handleFundingLocked(peer lnpeer.Peer,
func (f *Manager) handleFundingLocked(peer lnpeer.Peer,
msg *lnwire.FundingLocked) {
defer f.wg.Done()
@ -2698,7 +2698,7 @@ type chanAnnouncement struct {
// identity pub keys of both parties to the channel, and the second segment is
// authenticated only by us and contains our directional routing policy for the
// channel.
func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey,
func (f *Manager) newChanAnnouncement(localPubKey, remotePubKey,
localFundingKey, remoteFundingKey *btcec.PublicKey,
shortChanID lnwire.ShortChannelID, chanID lnwire.ChannelID,
fwdMinHTLC, fwdMaxHTLC lnwire.MilliSatoshi) (*chanAnnouncement, error) {
@ -2841,7 +2841,7 @@ func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey,
// the network during its next trickle.
// This method is synchronous and will return when all the network requests
// finish, either successfully or with an error.
func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKey,
func (f *Manager) announceChannel(localIDKey, remoteIDKey, localFundingKey,
remoteFundingKey *btcec.PublicKey, shortChanID lnwire.ShortChannelID,
chanID lnwire.ChannelID) error {
@ -2919,7 +2919,7 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe
// initFundingWorkflow sends a message to the funding manager instructing it
// to initiate a single funder workflow with the source peer.
// TODO(roasbeef): re-visit blocking nature..
func (f *fundingManager) initFundingWorkflow(peer lnpeer.Peer, req *openChanReq) {
func (f *Manager) initFundingWorkflow(peer lnpeer.Peer, req *openChanReq) {
f.fundingRequests <- &initFundingMsg{
peer: peer,
openChanReq: req,
@ -2973,7 +2973,7 @@ func getUpfrontShutdownScript(enableUpfrontShutdown bool, peer lnpeer.Peer,
// handleInitFundingMsg creates a channel reservation within the daemon's
// wallet, then sends a funding request to the remote peer kicking off the
// funding workflow.
func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
func (f *Manager) handleInitFundingMsg(msg *initFundingMsg) {
var (
peerKey = msg.peer.IdentityKey()
localAmt = msg.localFundingAmt
@ -3217,7 +3217,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
// handleErrorMsg processes the error which was received from remote peer,
// depending on the type of error we should do different clean up steps and
// inform the user about it.
func (f *fundingManager) handleErrorMsg(peer lnpeer.Peer,
func (f *Manager) handleErrorMsg(peer lnpeer.Peer,
msg *lnwire.Error) {
chanID := msg.ChanID
@ -3254,7 +3254,7 @@ func (f *fundingManager) handleErrorMsg(peer lnpeer.Peer,
// pruneZombieReservations loops through all pending reservations and fails the
// funding flow for any reservations that have not been updated since the
// ReservationTimeout and are not locked waiting for the funding transaction.
func (f *fundingManager) pruneZombieReservations() {
func (f *Manager) pruneZombieReservations() {
zombieReservations := make(pendingChannels)
f.resMtx.RLock()
@ -3289,7 +3289,7 @@ func (f *fundingManager) pruneZombieReservations() {
// cancelReservationCtx does all needed work in order to securely cancel the
// reservation.
func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey,
func (f *Manager) cancelReservationCtx(peerKey *btcec.PublicKey,
pendingChanID [32]byte, byRemote bool) (*reservationWithCtx, error) {
fndgLog.Infof("Cancelling funding reservation for node_key=%x, "+
@ -3337,7 +3337,7 @@ func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey,
// deleteReservationCtx deletes the reservation uniquely identified by the
// target public key of the peer, and the specified pending channel ID.
func (f *fundingManager) deleteReservationCtx(peerKey *btcec.PublicKey,
func (f *Manager) deleteReservationCtx(peerKey *btcec.PublicKey,
pendingChanID [32]byte) {
// TODO(roasbeef): possibly cancel funding barrier in peer's
@ -3362,7 +3362,7 @@ func (f *fundingManager) deleteReservationCtx(peerKey *btcec.PublicKey,
// getReservationCtx returns the reservation context for a particular pending
// channel ID for a target peer.
func (f *fundingManager) getReservationCtx(peerKey *btcec.PublicKey,
func (f *Manager) getReservationCtx(peerKey *btcec.PublicKey,
pendingChanID [32]byte) (*reservationWithCtx, error) {
peerIDKey := newSerializedKey(peerKey)
@ -3383,7 +3383,7 @@ func (f *fundingManager) getReservationCtx(peerKey *btcec.PublicKey,
// of being funded. After the funding transaction has been confirmed, the
// channel will receive a new, permanent channel ID, and will no longer be
// considered pending.
func (f *fundingManager) IsPendingChannel(pendingChanID [32]byte,
func (f *Manager) IsPendingChannel(pendingChanID [32]byte,
peer lnpeer.Peer) bool {
peerIDKey := newSerializedKey(peer.IdentityKey())
@ -3404,7 +3404,7 @@ func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey {
// saveChannelOpeningState saves the channelOpeningState for the provided
// chanPoint to the channelOpeningStateBucket.
func (f *fundingManager) saveChannelOpeningState(chanPoint *wire.OutPoint,
func (f *Manager) saveChannelOpeningState(chanPoint *wire.OutPoint,
state channelOpeningState, shortChanID *lnwire.ShortChannelID) error {
return kvdb.Update(f.cfg.Wallet.Cfg.Database, func(tx kvdb.RwTx) error {
@ -3431,7 +3431,7 @@ func (f *fundingManager) saveChannelOpeningState(chanPoint *wire.OutPoint,
// getChannelOpeningState fetches the channelOpeningState for the provided
// chanPoint from the database, or returns ErrChannelNotFound if the channel
// is not found.
func (f *fundingManager) getChannelOpeningState(chanPoint *wire.OutPoint) (
func (f *Manager) getChannelOpeningState(chanPoint *wire.OutPoint) (
channelOpeningState, *lnwire.ShortChannelID, error) {
var state channelOpeningState
@ -3467,7 +3467,7 @@ func (f *fundingManager) getChannelOpeningState(chanPoint *wire.OutPoint) (
}
// deleteChannelOpeningState removes any state for chanPoint from the database.
func (f *fundingManager) deleteChannelOpeningState(chanPoint *wire.OutPoint) error {
func (f *Manager) deleteChannelOpeningState(chanPoint *wire.OutPoint) error {
return kvdb.Update(f.cfg.Wallet.Cfg.Database, func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
if bucket == nil {

@ -185,7 +185,7 @@ type testNode struct {
msgChan chan lnwire.Message
announceChan chan lnwire.Message
publTxChan chan *wire.MsgTx
fundingMgr *fundingManager
fundingMgr *Manager
newChannels chan *newChannelMsg
mockNotifier *mockNotifier
mockChanEvent *mockChanEvent
@ -447,7 +447,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
op(&fundingCfg)
}
f, err := newFundingManager(fundingCfg)
f, err := NewFundingManager(fundingCfg)
if err != nil {
t.Fatalf("failed creating fundingManager: %v", err)
}
@ -494,7 +494,7 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
chainedAcceptor := chanacceptor.NewChainedAcceptor()
f, err := newFundingManager(fundingConfig{
f, err := NewFundingManager(fundingConfig{
IDKey: oldCfg.IDKey,
Wallet: oldCfg.Wallet,
Notifier: oldCfg.Notifier,

@ -219,7 +219,7 @@ type server struct {
cc *chainreg.ChainControl
fundingMgr *fundingManager
fundingMgr *Manager
localChanDB *channeldb.DB
@ -1002,7 +1002,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
return nil, err
}
s.fundingMgr, err = newFundingManager(fundingConfig{
s.fundingMgr, err = NewFundingManager(fundingConfig{
NoWumboChans: !cfg.ProtocolOptions.Wumbo(),
IDKey: nodeKeyECDH.PubKey(),
Wallet: cc.Wallet,