Merge pull request #1505 from wpaulino/fundingmanager-send-peer-directly

fundingmanager: send messages directly to peers
This commit is contained in:
Olaoluwa Osuntokun 2018-07-20 17:48:14 -07:00 committed by GitHub
commit 271db7d06d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 485 additions and 346 deletions

@ -107,7 +107,9 @@ type Config struct {
// NotifyWhenOnline is a function that allows the gossiper to be
// notified when a certain peer comes online, allowing it to
// retry sending a peer message.
NotifyWhenOnline func(peer *btcec.PublicKey, connectedChan chan<- struct{})
//
// NOTE: The peerChan channel must be buffered.
NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer)
// ProofMatureDelta the number of confirmations which is needed before
// exchange the channel announcement proofs.
@ -2399,13 +2401,13 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably(
"to peer(%x): %v. Will retry when online.",
remotePeer.SerializeCompressed(), err)
connected := make(chan struct{})
d.cfg.NotifyWhenOnline(remotePeer, connected)
peerChan := make(chan lnpeer.Peer, 1)
d.cfg.NotifyWhenOnline(remotePeer, peerChan)
select {
case <-connected:
case <-peerChan:
// Retry sending.
log.Infof("peer %x reconnected. Retry sending"+
log.Infof("Peer %x reconnected. Retry sending"+
" AnnounceSignatures.",
remotePeer.SerializeCompressed())

@ -23,6 +23,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
"github.com/btcsuite/btcd/btcec"
@ -1186,9 +1187,9 @@ func TestSignatureAnnouncementRetry(t *testing.T) {
// We expect the gossiper to register for a notification when the peer
// comes back online, so keep track of the channel it wants to get
// notified on.
notifyPeers := make(chan chan<- struct{}, 1)
notifyPeers := make(chan chan<- lnpeer.Peer, 1)
ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
connectedChan chan<- struct{}) {
connectedChan chan<- lnpeer.Peer) {
notifyPeers <- connectedChan
}
@ -1207,7 +1208,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) {
// Since sending this local announcement proof to the remote will fail,
// the gossiper should register for a notification when the remote is
// online again.
var conChan chan<- struct{}
var conChan chan<- lnpeer.Peer
select {
case conChan = <-notifyPeers:
case <-time.After(2 * time.Second):
@ -1371,9 +1372,9 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
msg ...lnwire.Message) error {
return fmt.Errorf("intentional error in SendToPeer")
}
notifyPeers := make(chan chan<- struct{}, 1)
notifyPeers := make(chan chan<- lnpeer.Peer, 1)
ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
connectedChan chan<- struct{}) {
connectedChan chan<- lnpeer.Peer) {
notifyPeers <- connectedChan
}
@ -1391,7 +1392,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
// Since sending to the remote peer will fail, the gossiper should
// register for a notification when it comes back online.
var conChan chan<- struct{}
var conChan chan<- lnpeer.Peer
select {
case conChan = <-notifyPeers:
case <-time.After(2 * time.Second):
@ -1430,7 +1431,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
return fmt.Errorf("intentional error in SendToPeer")
},
NotifyWhenOnline: func(peer *btcec.PublicKey,
connectedChan chan<- struct{}) {
connectedChan chan<- lnpeer.Peer) {
notifyPeers <- connectedChan
},
Router: ctx.gossiper.cfg.Router,
@ -1606,9 +1607,9 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
return nil
}
notifyPeers := make(chan chan<- struct{}, 1)
notifyPeers := make(chan chan<- lnpeer.Peer, 1)
ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
connectedChan chan<- struct{}) {
connectedChan chan<- lnpeer.Peer) {
notifyPeers <- connectedChan
}
@ -2145,6 +2146,8 @@ type mockPeer struct {
quit chan struct{}
}
var _ lnpeer.Peer = (*mockPeer)(nil)
func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error {
if p.sentMsgs == nil && p.quit == nil {
return nil
@ -2159,6 +2162,9 @@ func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error {
return nil
}
func (p *mockPeer) AddNewChannel(_ *lnwallet.LightningChannel, _ <-chan struct{}) error {
return nil
}
func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil }
func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk }
func (p *mockPeer) PubKey() [33]byte {
@ -2166,3 +2172,4 @@ func (p *mockPeer) PubKey() [33]byte {
copy(pubkey[:], p.pk.SerializeCompressed())
return pubkey
}
func (p *mockPeer) Address() net.Addr { return nil }

@ -12,6 +12,10 @@ import (
"golang.org/x/crypto/salsa20"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
@ -19,14 +23,11 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
)
const (
@ -85,6 +86,12 @@ var (
//
// TODO(roasbeef): add command line param to modify
maxFundingAmount = maxBtcFundingAmount
// ErrFundingManagerShuttingDown is an error returned when attempting to
// process a funding request/message but the funding manager has already
// been signaled to shut down.
ErrFundingManagerShuttingDown = errors.New("funding manager shutting " +
"down")
)
// reservationWithCtx encapsulates a pending channel reservation. This wrapper
@ -98,7 +105,7 @@ var (
// * deadlines, etc.
type reservationWithCtx struct {
reservation *lnwallet.ChannelReservation
peerAddress *lnwire.NetAddress
peer lnpeer.Peer
chanAmt btcutil.Amount
@ -145,7 +152,7 @@ func (r *reservationWithCtx) updateTimestamp() {
// embedded within this message giving the funding manager full context w.r.t
// the workflow.
type initFundingMsg struct {
peerAddress *lnwire.NetAddress
peer lnpeer.Peer
*openChanReq
}
@ -153,47 +160,47 @@ type initFundingMsg struct {
// the message. This allows the funding manager to queue a response directly to
// the peer, progressing the funding workflow.
type fundingOpenMsg struct {
msg *lnwire.OpenChannel
peerAddress *lnwire.NetAddress
msg *lnwire.OpenChannel
peer lnpeer.Peer
}
// fundingAcceptMsg couples an lnwire.AcceptChannel message with the peer who
// sent the message. This allows the funding manager to queue a response
// directly to the peer, progressing the funding workflow.
type fundingAcceptMsg struct {
msg *lnwire.AcceptChannel
peerAddress *lnwire.NetAddress
msg *lnwire.AcceptChannel
peer lnpeer.Peer
}
// fundingCreatedMsg couples an lnwire.FundingCreated message with the peer who
// sent the message. This allows the funding manager to queue a response
// directly to the peer, progressing the funding workflow.
type fundingCreatedMsg struct {
msg *lnwire.FundingCreated
peerAddress *lnwire.NetAddress
msg *lnwire.FundingCreated
peer lnpeer.Peer
}
// fundingSignedMsg couples an lnwire.FundingSigned message with the peer who
// sent the message. This allows the funding manager to queue a response
// directly to the peer, progressing the funding workflow.
type fundingSignedMsg struct {
msg *lnwire.FundingSigned
peerAddress *lnwire.NetAddress
msg *lnwire.FundingSigned
peer lnpeer.Peer
}
// fundingLockedMsg couples an lnwire.FundingLocked message with the peer who
// sent the message. This allows the funding manager to finalize the funding
// process and announce the existence of the new channel.
type fundingLockedMsg struct {
msg *lnwire.FundingLocked
peerAddress *lnwire.NetAddress
msg *lnwire.FundingLocked
peer lnpeer.Peer
}
// fundingErrorMsg couples an lnwire.Error message with the peer who sent the
// message. This allows the funding manager to properly process the error.
type fundingErrorMsg struct {
err *lnwire.Error
peerAddress *lnwire.NetAddress
err *lnwire.Error
peerKey *btcec.PublicKey
}
// pendingChannels is a map instantiated per-peer which tracks all active
@ -257,21 +264,13 @@ type fundingConfig struct {
// to the greater network.
SendAnnouncement func(msg lnwire.Message) error
// SendToPeer allows the FundingManager to send messages to the peer
// node during the multiple steps involved in the creation of the
// channel's funding transaction and initial commitment transaction.
SendToPeer func(target *btcec.PublicKey, msgs ...lnwire.Message) error
// NotifyWhenOnline allows the FundingManager to register with a
// subsystem that will notify it when the peer comes online.
// This is used when sending the fundingLocked message, since it MUST be
// subsystem that will notify it when the peer comes online. This is
// used when sending the fundingLocked message, since it MUST be
// delivered after the funding transaction is confirmed.
NotifyWhenOnline func(peer *btcec.PublicKey, connectedChan chan<- struct{})
// FindPeer searches the list of peers connected to the node so that
// the FundingManager can notify other daemon subsystems as necessary
// during the funding process.
FindPeer func(peerKey *btcec.PublicKey) (*peer, error)
//
// NOTE: The peerChan channel must be buffered.
NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer)
// FindChannel queries the database for the channel with the given
// channel ID.
@ -318,9 +317,9 @@ type fundingConfig struct {
// WatchNewChannel is to be called once a new channel enters the final
// funding stage: waiting for on-chain confirmation. This method sends
// the channel to the ChainArbitrator so it can watch for any on-chain
// events related to the channel. We also provide the address of the
// events related to the channel. We also provide the public key of the
// node we're establishing a channel with for reconnection purposes.
WatchNewChannel func(*channeldb.OpenChannel, *lnwire.NetAddress) error
WatchNewChannel func(*channeldb.OpenChannel, *btcec.PublicKey) error
// ReportShortChanID allows the funding manager to report the newly
// discovered short channel ID of a formerly pending channel to outside
@ -547,16 +546,29 @@ func (f *fundingManager) Start() error {
// resume wait on startup.
case shortChanID, ok := <-confChan:
if !ok {
fndgLog.Errorf("waiting for funding" +
fndgLog.Errorf("Waiting for funding" +
"confirmation failed")
return
}
// Success, funding transaction was confirmed.
err := f.handleFundingConfirmation(ch, shortChanID)
// The funding transaction has confirmed, so
// we'll attempt to retrieve the remote peer
// to complete the rest of the funding flow.
peerChan := make(chan lnpeer.Peer, 1)
f.cfg.NotifyWhenOnline(ch.IdentityPub, peerChan)
var peer lnpeer.Peer
select {
case peer = <-peerChan:
case <-f.quit:
return
}
err := f.handleFundingConfirmation(
peer, ch, shortChanID,
)
if err != nil {
fndgLog.Errorf("failed to handle funding"+
"confirmation: %v", err)
fndgLog.Errorf("Failed to handle "+
"funding confirmation: %v", err)
return
}
}
@ -614,10 +626,21 @@ func (f *fundingManager) Start() error {
go func(dbChan *channeldb.OpenChannel) {
defer f.wg.Done()
err := f.handleFundingConfirmation(dbChan, shortChanID)
peerChan := make(chan lnpeer.Peer, 1)
f.cfg.NotifyWhenOnline(dbChan.IdentityPub, peerChan)
var peer lnpeer.Peer
select {
case peer = <-peerChan:
case <-f.quit:
return
}
err := f.handleFundingConfirmation(
peer, dbChan, shortChanID,
)
if err != nil {
fndgLog.Errorf("failed to handle funding"+
"confirmation: %v", err)
fndgLog.Errorf("Failed to handle "+
"funding confirmation: %v", err)
return
}
}(channel)
@ -740,7 +763,7 @@ func (f *fundingManager) PendingChannels() ([]*pendingChannel, error) {
select {
case f.queries <- req:
case <-f.quit:
return nil, fmt.Errorf("fundingmanager shutting down")
return nil, ErrFundingManagerShuttingDown
}
select {
@ -749,7 +772,7 @@ func (f *fundingManager) PendingChannels() ([]*pendingChannel, error) {
case err := <-errChan:
return nil, err
case <-f.quit:
return nil, fmt.Errorf("fundingmanager shutting down")
return nil, ErrFundingManagerShuttingDown
}
}
@ -796,13 +819,13 @@ func (f *fundingManager) CancelPeerReservations(nodePub [33]byte) {
//
// TODO(roasbeef): if peer disconnects, and haven't yet broadcast funding
// transaction, then all reservations should be cleared.
func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey,
tempChanID [32]byte, fundingErr error) {
func (f *fundingManager) failFundingFlow(peer lnpeer.Peer, tempChanID [32]byte,
fundingErr error) {
fndgLog.Debugf("Failing funding flow for pendingID=%x: %v",
tempChanID, fundingErr)
ctx, err := f.cancelReservationCtx(peer, tempChanID)
ctx, err := f.cancelReservationCtx(peer.IdentityKey(), tempChanID)
if err != nil {
fndgLog.Errorf("unable to cancel reservation: %v", err)
}
@ -837,8 +860,8 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey,
}
fndgLog.Debugf("Sending funding error to peer (%x): %v",
peer.SerializeCompressed(), spew.Sdump(errMsg))
if err := f.cfg.SendToPeer(peer, errMsg); err != nil {
peer.IdentityKey().SerializeCompressed(), spew.Sdump(errMsg))
if err := peer.SendMessage(false, errMsg); err != nil {
fndgLog.Errorf("unable to send error message to peer %v", err)
}
}
@ -919,10 +942,10 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) {
// processFundingOpen sends a message to the fundingManager allowing it to
// initiate the new funding workflow with the source peer.
func (f *fundingManager) processFundingOpen(msg *lnwire.OpenChannel,
peerAddress *lnwire.NetAddress) {
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingOpenMsg{msg, peerAddress}:
case f.fundingMsgs <- &fundingOpenMsg{msg, peer}:
case <-f.quit:
return
}
@ -938,7 +961,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// Check number of pending channels to be smaller than maximum allowed
// number and send ErrorGeneric to remote peer if condition is
// violated.
peerIDKey := newSerializedKey(fmsg.peerAddress.IdentityKey)
peerIDKey := newSerializedKey(fmsg.peer.IdentityKey())
msg := fmsg.msg
amt := msg.FundingAmount
@ -949,8 +972,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
if len(f.activeReservations[peerIDKey]) >= cfg.MaxPendingChannels {
f.resMtx.RUnlock()
f.failFundingFlow(
fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID,
lnwire.ErrMaxPendingChannels)
fmsg.peer, fmsg.msg.PendingChannelID,
lnwire.ErrMaxPendingChannels,
)
return
}
f.resMtx.RUnlock()
@ -964,8 +988,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
fndgLog.Errorf("unable to query wallet: %v", err)
}
f.failFundingFlow(
fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID,
lnwire.ErrSynchronizingChain)
fmsg.peer, fmsg.msg.PendingChannelID,
lnwire.ErrSynchronizingChain,
)
return
}
@ -973,7 +998,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// current soft-limit for channel size.
if msg.FundingAmount > maxFundingAmount {
f.failFundingFlow(
fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID,
fmsg.peer, fmsg.msg.PendingChannelID,
lnwire.ErrChanTooLarge,
)
return
@ -983,7 +1008,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// a channel that's below our current min channel size.
if amt < f.cfg.MinChanSize {
f.failFundingFlow(
fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID,
fmsg.peer, fmsg.msg.PendingChannelID,
lnwallet.ErrChanTooSmall(amt, btcutil.Amount(f.cfg.MinChanSize)),
)
return
@ -992,7 +1017,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
fndgLog.Infof("Recv'd fundingRequest(amt=%v, push=%v, delay=%v, "+
"pendingId=%x) from peer(%x)", amt, msg.PushAmount,
msg.CsvDelay, msg.PendingChannelID,
fmsg.peerAddress.IdentityKey.SerializeCompressed())
fmsg.peer.IdentityKey().SerializeCompressed())
// Attempt to initialize a reservation within the wallet. If the wallet
// has insufficient resources to create the channel, then the
@ -1003,13 +1028,12 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
reservation, err := f.cfg.Wallet.InitChannelReservation(
amt, 0, msg.PushAmount,
lnwallet.SatPerKWeight(msg.FeePerKiloWeight), 0,
fmsg.peerAddress.IdentityKey, fmsg.peerAddress.Address,
fmsg.peer.IdentityKey(), fmsg.peer.Address(),
&chainHash, msg.ChannelFlags,
)
if err != nil {
fndgLog.Errorf("Unable to initialize reservation: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
return
}
@ -1029,9 +1053,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
)
if err != nil {
fndgLog.Errorf("Unacceptable channel constraints: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
fmsg.msg.PendingChannelID, err,
)
f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err)
return
}
@ -1059,7 +1081,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
remoteCsvDelay: remoteCsvDelay,
remoteMinHtlc: minHtlc,
err: make(chan error, 1),
peerAddress: fmsg.peerAddress,
peer: fmsg.peer,
}
f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx
f.resMtx.Unlock()
@ -1101,8 +1123,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
err = reservation.ProcessSingleContribution(remoteContribution)
if err != nil {
fndgLog.Errorf("unable to add contribution reservation: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
return
}
@ -1130,11 +1151,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
HtlcPoint: ourContribution.HtlcBasePoint.PubKey,
FirstCommitmentPoint: ourContribution.FirstCommitmentPoint,
}
err = f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, &fundingAccept)
if err != nil {
if err := fmsg.peer.SendMessage(false, &fundingAccept); err != nil {
fndgLog.Errorf("unable to send funding response to peer: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
return
}
}
@ -1142,10 +1161,10 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// processFundingAccept sends a message to the fundingManager allowing it to
// continue the second phase of a funding workflow with the target peer.
func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel,
peerAddress *lnwire.NetAddress) {
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingAcceptMsg{msg, peerAddress}:
case f.fundingMsgs <- &fundingAcceptMsg{msg, peer}:
case <-f.quit:
return
}
@ -1157,7 +1176,7 @@ func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel,
func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
msg := fmsg.msg
pendingChanID := fmsg.msg.PendingChannelID
peerKey := fmsg.peerAddress.IdentityKey
peerKey := fmsg.peer.IdentityKey()
resCtx, err := f.getReservationCtx(peerKey, pendingChanID)
if err != nil {
@ -1181,8 +1200,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
)
if err != nil {
fndgLog.Warnf("Unacceptable channel constraints: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
fmsg.msg.PendingChannelID, err)
f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err)
return
}
@ -1228,9 +1246,8 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
err = resCtx.reservation.ProcessContribution(remoteContribution)
if err != nil {
fndgLog.Errorf("Unable to process contribution from %v: %v",
fmsg.peerAddress.IdentityKey, err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err)
peerKey, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
return
}
@ -1273,15 +1290,12 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
fundingCreated.CommitSig, err = lnwire.NewSigFromRawSignature(sig)
if err != nil {
fndgLog.Errorf("Unable to parse signature: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
return
}
err = f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingCreated)
if err != nil {
if err := fmsg.peer.SendMessage(false, fundingCreated); err != nil {
fndgLog.Errorf("Unable to send funding complete message: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
return
}
}
@ -1289,10 +1303,10 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
// processFundingCreated queues a funding complete message coupled with the
// source peer to the fundingManager.
func (f *fundingManager) processFundingCreated(msg *lnwire.FundingCreated,
peerAddress *lnwire.NetAddress) {
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingCreatedMsg{msg, peerAddress}:
case f.fundingMsgs <- &fundingCreatedMsg{msg, peer}:
case <-f.quit:
return
}
@ -1303,7 +1317,7 @@ func (f *fundingManager) processFundingCreated(msg *lnwire.FundingCreated,
// processed, a signature is sent to the remote peer allowing it to broadcast
// the funding transaction, progressing the workflow into the final stage.
func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
peerKey := fmsg.peerAddress.IdentityKey
peerKey := fmsg.peer.IdentityKey()
pendingChanID := fmsg.msg.PendingChannelID
resCtx, err := f.getReservationCtx(peerKey, pendingChanID)
@ -1333,8 +1347,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
if err != nil {
// TODO(roasbeef): better error logging: peerID, channelID, etc.
fndgLog.Errorf("unable to complete single reservation: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
return
}
@ -1374,8 +1387,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
ourCommitSig, err := lnwire.NewSigFromRawSignature(sig)
if err != nil {
fndgLog.Errorf("unable to parse signature: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
deleteFromDatabase()
return
}
@ -1384,10 +1396,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
ChanID: channelID,
CommitSig: ourCommitSig,
}
if err := f.cfg.SendToPeer(peerKey, fundingSigned); err != nil {
if err := fmsg.peer.SendMessage(false, fundingSigned); err != nil {
fndgLog.Errorf("unable to send FundingSigned message: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
deleteFromDatabase()
return
}
@ -1395,8 +1406,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// Now that we've sent over our final signature for this channel, we'll
// send it to the ChainArbitrator so it can watch for any on-chain
// actions during this final confirmation stage.
peerAddr := resCtx.peerAddress
if err := f.cfg.WatchNewChannel(completeChan, peerAddr); err != nil {
if err := f.cfg.WatchNewChannel(completeChan, peerKey); err != nil {
fndgLog.Errorf("Unable to send new ChannelPoint(%v) for "+
"arbitration: %v", fundingOut, err)
}
@ -1446,8 +1456,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
err := fmt.Errorf("timeout waiting for funding tx "+
"(%v) to confirm", completeChan.FundingOutpoint)
fndgLog.Warnf(err.Error())
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
deleteFromDatabase()
return
case <-f.quit:
@ -1466,8 +1475,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// Success, funding transaction was confirmed.
f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID)
err := f.handleFundingConfirmation(completeChan,
shortChanID)
err := f.handleFundingConfirmation(
fmsg.peer, completeChan, shortChanID,
)
if err != nil {
fndgLog.Errorf("failed to handle funding"+
"confirmation: %v", err)
@ -1479,10 +1489,10 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// processFundingSigned sends a single funding sign complete message along with
// the source peer to the funding manager.
func (f *fundingManager) processFundingSigned(msg *lnwire.FundingSigned,
peerAddress *lnwire.NetAddress) {
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingSignedMsg{msg, peerAddress}:
case f.fundingMsgs <- &fundingSignedMsg{msg, peer}:
case <-f.quit:
return
}
@ -1505,20 +1515,17 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
err := fmt.Errorf("Unable to find signed reservation for "+
"chan_id=%x", fmsg.msg.ChanID)
fndgLog.Warnf(err.Error())
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
fmsg.msg.ChanID, err)
f.failFundingFlow(fmsg.peer, fmsg.msg.ChanID, err)
return
}
peerKey := fmsg.peerAddress.IdentityKey
resCtx, err := f.getReservationCtx(fmsg.peerAddress.IdentityKey,
pendingChanID)
peerKey := fmsg.peer.IdentityKey()
resCtx, err := f.getReservationCtx(peerKey, pendingChanID)
if err != nil {
fndgLog.Warnf("Unable to find reservation (peerID:%v, chanID:%x)",
peerKey, pendingChanID[:])
// TODO: add ErrChanNotFound?
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
return
}
@ -1538,8 +1545,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
completeChan, err := resCtx.reservation.CompleteReservation(nil, commitSig)
if err != nil {
fndgLog.Errorf("Unable to complete reservation sign complete: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
return
}
@ -1547,14 +1553,14 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
// we'll send the to be active channel to the ChainArbitrator so it can
// watch for any on-chin actions before the channel has fully
// confirmed.
peerAddr := resCtx.peerAddress
if err := f.cfg.WatchNewChannel(completeChan, peerAddr); err != nil {
if err := f.cfg.WatchNewChannel(completeChan, peerKey); err != nil {
fndgLog.Errorf("Unable to send new ChannelPoint(%v) for "+
"arbitration: %v", fundingPoint, err)
}
fndgLog.Infof("Finalizing pendingID(%x) over ChannelPoint(%v), "+
"waiting for channel open on-chain", pendingChanID[:], fundingPoint)
"waiting for channel open on-chain", pendingChanID[:],
fundingPoint)
// Send an update to the upstream client that the negotiation process
// is over.
@ -1619,7 +1625,9 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
}
defer lnChannel.Stop()
err = f.sendFundingLocked(completeChan, lnChannel, shortChanID)
err = f.sendFundingLocked(
fmsg.peer, completeChan, lnChannel, shortChanID,
)
if err != nil {
fndgLog.Errorf("failed sending fundingLocked: %v", err)
return
@ -1865,10 +1873,11 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open
}
// handleFundingConfirmation is a wrapper method for creating a new
// lnwallet.LightningChannel object, calling sendFundingLocked, addToRouterGraph,
// and annAfterSixConfs. This is called after the funding transaction is
// confirmed.
func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenChannel,
// lnwallet.LightningChannel object, calling sendFundingLocked,
// addToRouterGraph, and annAfterSixConfs. This is called after the funding
// transaction is confirmed.
func (f *fundingManager) handleFundingConfirmation(peer lnpeer.Peer,
completeChan *channeldb.OpenChannel,
shortChanID *lnwire.ShortChannelID) error {
// We create the state-machine object which wraps the database state.
@ -1884,7 +1893,7 @@ func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenC
fndgLog.Debugf("ChannelID(%v) is now fully confirmed!", chanID)
err = f.sendFundingLocked(completeChan, lnChannel, shortChanID)
err = f.sendFundingLocked(peer, completeChan, lnChannel, shortChanID)
if err != nil {
return fmt.Errorf("failed sending fundingLocked: %v", err)
}
@ -1904,11 +1913,12 @@ func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenC
// sendFundingLocked creates and sends the fundingLocked message.
// This should be called after the funding transaction has been confirmed,
// and the channelState is 'markedOpen'.
func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel,
channel *lnwallet.LightningChannel,
func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer,
completeChan *channeldb.OpenChannel, channel *lnwallet.LightningChannel,
shortChanID *lnwire.ShortChannelID) error {
chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)
peerKey := completeChan.IdentityPub
// Next, we'll send over the funding locked message which marks that we
// consider the channel open by presenting the remote party with our
@ -1933,33 +1943,30 @@ func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel,
// down.
for {
fndgLog.Debugf("Sending FundingLocked for ChannelID(%v) to "+
"peer %x", chanID,
completeChan.IdentityPub.SerializeCompressed())
"peer %x", chanID, peerKey.SerializeCompressed())
err = f.cfg.SendToPeer(completeChan.IdentityPub,
fundingLockedMsg)
if err == nil {
// Sending succeeded, we can break out and continue
// the funding flow.
if err := peer.SendMessage(false, fundingLockedMsg); err == nil {
// Sending succeeded, we can break out and continue the
// funding flow.
break
}
fndgLog.Warnf("unable to send fundingLocked to peer %x: "+
"%v. Will retry when online",
completeChan.IdentityPub.SerializeCompressed(), err)
fndgLog.Warnf("Unable to send fundingLocked to peer %x: %v. "+
"Will retry when online", peerKey.SerializeCompressed(),
err)
connected := make(chan struct{})
connected := make(chan lnpeer.Peer, 1)
f.cfg.NotifyWhenOnline(completeChan.IdentityPub, connected)
select {
case <-connected:
fndgLog.Infof("Peer(%x) came back online, will retry "+
"sending FundingLocked for ChannelID(%v)",
completeChan.IdentityPub.SerializeCompressed(),
chanID)
peerKey.SerializeCompressed(), chanID)
// Retry sending.
case <-f.quit:
return fmt.Errorf("shutting down unable to send")
return ErrFundingManagerShuttingDown
}
}
@ -2069,24 +2076,28 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
confNtfn, err := f.cfg.Notifier.RegisterConfirmationsNtfn(&txid,
numConfs, completeChan.FundingBroadcastHeight)
if err != nil {
return fmt.Errorf("Unable to register for confirmation of "+
"ChannelPoint(%v): %v", completeChan.FundingOutpoint, err)
return fmt.Errorf("Unable to register for "+
"confirmation of ChannelPoint(%v): %v",
completeChan.FundingOutpoint, err)
}
// Wait until 6 confirmations has been reached or the wallet signals
// a shutdown.
// Wait until 6 confirmations has been reached or the wallet
// signals a shutdown.
select {
case _, ok := <-confNtfn.Confirmed:
if !ok {
return fmt.Errorf("ChainNotifier shutting down, cannot "+
"complete funding flow for ChannelPoint(%v)",
return fmt.Errorf("ChainNotifier shutting "+
"down, cannot complete funding flow "+
"for ChannelPoint(%v)",
completeChan.FundingOutpoint)
}
// Fallthrough.
case <-f.quit:
return fmt.Errorf("fundingManager shutting down, stopping funding "+
"flow for ChannelPoint(%v)", completeChan.FundingOutpoint)
return fmt.Errorf("%v, stopping funding flow for "+
"ChannelPoint(%v)",
ErrFundingManagerShuttingDown,
completeChan.FundingOutpoint)
}
fundingPoint := completeChan.FundingOutpoint
@ -2095,9 +2106,10 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
fndgLog.Infof("Announcing ChannelPoint(%v), short_chan_id=%v",
&fundingPoint, spew.Sdump(shortChanID))
// We'll obtain their min HTLC as we'll use this value within our
// ChannelUpdate. We use this value isn't of ours, as the remote party
// will be the one that's carrying the HTLC towards us.
// We'll obtain their min HTLC as we'll use this value within
// our ChannelUpdate. We use this value isn't of ours, as the
// remote party will be the one that's carrying the HTLC towards
// us.
remoteMinHTLC := completeChan.RemoteChanCfg.MinHTLC
// Create and broadcast the proofs required to make this channel
@ -2131,10 +2143,10 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
// processFundingLocked sends a message to the fundingManager allowing it to
// finish the funding workflow.
func (f *fundingManager) processFundingLocked(msg *lnwire.FundingLocked,
peerAddress *lnwire.NetAddress) {
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingLockedMsg{msg, peerAddress}:
case f.fundingMsgs <- &fundingLockedMsg{msg, peer}:
case <-f.quit:
return
}
@ -2146,7 +2158,7 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
defer f.wg.Done()
fndgLog.Debugf("Received FundingLocked for ChannelID(%v) from "+
"peer %x", fmsg.msg.ChanID,
fmsg.peerAddress.IdentityKey.SerializeCompressed())
fmsg.peer.IdentityKey().SerializeCompressed())
// If we are currently in the process of handling a funding locked
// message for this channel, ignore.
@ -2242,32 +2254,11 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
f.barrierMtx.Unlock()
}()
// Finally, we'll find the peer that sent us this message so we can
// provide it with the fully initialized channel state.
peer, err := f.cfg.FindPeer(fmsg.peerAddress.IdentityKey)
if err != nil {
fndgLog.Errorf("Unable to find peer: %v", err)
if err := fmsg.peer.AddNewChannel(channel, f.quit); err != nil {
fndgLog.Errorf("Unable to add new channel %v with peer %x: %v",
fmsg.peer.IdentityKey().SerializeCompressed(),
*channel.ChanPoint, err)
channel.Stop()
return
}
newChanDone := make(chan struct{})
newChanMsg := &newChannelMsg{
channel: channel,
done: newChanDone,
}
select {
case peer.newChannels <- newChanMsg:
case <-f.quit:
return
}
// We pause here to wait for the peer to recognize the new channel
// before we close the channel barrier corresponding to the channel.
select {
case <-f.quit:
return
case <-newChanDone: // Fallthrough if we're not quitting.
}
}
@ -2486,11 +2477,9 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe
// initFundingWorkflow sends a message to the funding manager instructing it
// to initiate a single funder workflow with the source peer.
// TODO(roasbeef): re-visit blocking nature..
func (f *fundingManager) initFundingWorkflow(peerAddress *lnwire.NetAddress,
req *openChanReq) {
func (f *fundingManager) initFundingWorkflow(peer lnpeer.Peer, req *openChanReq) {
f.fundingRequests <- &initFundingMsg{
peerAddress: peerAddress,
peer: peer,
openChanReq: req,
}
}
@ -2500,7 +2489,7 @@ func (f *fundingManager) initFundingWorkflow(peerAddress *lnwire.NetAddress,
// funding workflow.
func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
var (
peerKey = msg.peerAddress.IdentityKey
peerKey = msg.peer.IdentityKey()
localAmt = msg.localFundingAmt
remoteAmt = msg.remoteFundingAmt
capacity = localAmt + remoteAmt
@ -2519,8 +2508,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
fndgLog.Infof("Initiating fundingRequest(localAmt=%v, remoteAmt=%v, "+
"capacity=%v, chainhash=%v, addr=%v, dustLimit=%v)", localAmt,
msg.pushAmt, capacity, msg.chainHash, msg.peerAddress.Address,
ourDustLimit)
msg.pushAmt, capacity, msg.chainHash, peerKey, ourDustLimit)
// First, we'll query the fee estimator for a fee that should get the
// commitment transaction confirmed by the next few blocks (conf target
@ -2556,7 +2544,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
// request will fail, and be aborted.
reservation, err := f.cfg.Wallet.InitChannelReservation(
capacity, localAmt, msg.pushAmt, commitFeePerKw,
msg.fundingFeePerVSize, peerKey, msg.peerAddress.Address,
msg.fundingFeePerVSize, peerKey, msg.peer.Address(),
&msg.chainHash, channelFlags,
)
if err != nil {
@ -2597,7 +2585,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
remoteCsvDelay: remoteCsvDelay,
remoteMinHtlc: minHtlc,
reservation: reservation,
peerAddress: msg.peerAddress,
peer: msg.peer,
updates: msg.updates,
err: msg.err,
}
@ -2619,7 +2607,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
maxHtlcs := f.cfg.RequiredRemoteMaxHTLCs(capacity)
fndgLog.Infof("Starting funding workflow with %v for pendingID(%x)",
msg.peerAddress.Address, chanID)
msg.peer.Address(), chanID)
fundingOpen := lnwire.OpenChannel{
ChainHash: *f.cfg.Wallet.Cfg.NetParams.GenesisHash,
@ -2641,7 +2629,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
FirstCommitmentPoint: ourContribution.FirstCommitmentPoint,
ChannelFlags: channelFlags,
}
if err := f.cfg.SendToPeer(peerKey, &fundingOpen); err != nil {
if err := msg.peer.SendMessage(false, &fundingOpen); err != nil {
e := fmt.Errorf("Unable to send funding request message: %v",
err)
fndgLog.Errorf(e.Error())
@ -2681,10 +2669,10 @@ func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) {
// processFundingError sends a message to the fundingManager allowing it to
// process the occurred generic error.
func (f *fundingManager) processFundingError(err *lnwire.Error,
peerAddress *lnwire.NetAddress) {
peerKey *btcec.PublicKey) {
select {
case f.fundingMsgs <- &fundingErrorMsg{err, peerAddress}:
case f.fundingMsgs <- &fundingErrorMsg{err, peerKey}:
case <-f.quit:
return
}
@ -2696,13 +2684,12 @@ func (f *fundingManager) processFundingError(err *lnwire.Error,
func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) {
protocolErr := fmsg.err
peerKey := fmsg.peerAddress.IdentityKey
chanID := fmsg.err.ChanID
// First, we'll attempt to retrieve and cancel the funding workflow
// that this error was tied to. If we're unable to do so, then we'll
// exit early as this was an unwarranted error.
resCtx, err := f.cancelReservationCtx(peerKey, chanID)
resCtx, err := f.cancelReservationCtx(fmsg.peerKey, chanID)
if err != nil {
fndgLog.Warnf("Received error for non-existent funding "+
"flow: %v (%v)", err, spew.Sdump(protocolErr))
@ -2713,7 +2700,7 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) {
// error back to the caller (if any), and cancel the workflow itself.
lnErr := lnwire.ErrorCode(protocolErr.Data[0])
fndgLog.Errorf("Received funding error from %x: %v",
peerKey.SerializeCompressed(), string(protocolErr.Data),
fmsg.peerKey.SerializeCompressed(), string(protocolErr.Data),
)
// If this isn't a simple error code, then we'll display the entire
@ -2753,10 +2740,11 @@ func (f *fundingManager) pruneZombieReservations() {
f.resMtx.RUnlock()
for pendingChanID, resCtx := range zombieReservations {
err := fmt.Errorf("reservation timed out waiting for peer (peerID:%v, "+
"chanID:%x)", resCtx.peerAddress.IdentityKey, pendingChanID[:])
err := fmt.Errorf("reservation timed out waiting for peer "+
"(peerID:%v, chanID:%x)", resCtx.peer.IdentityKey(),
pendingChanID[:])
fndgLog.Warnf(err.Error())
f.failFundingFlow(resCtx.peerAddress.IdentityKey, pendingChanID, err)
f.failFundingFlow(resCtx.peer, pendingChanID, err)
}
}
@ -2849,9 +2837,9 @@ func (f *fundingManager) getReservationCtx(peerKey *btcec.PublicKey,
// channel will receive a new, permanent channel ID, and will no longer be
// considered pending.
func (f *fundingManager) IsPendingChannel(pendingChanID [32]byte,
peerAddress *lnwire.NetAddress) bool {
peerKey *btcec.PublicKey) bool {
peerIDKey := newSerializedKey(peerAddress.IdentityKey)
peerIDKey := newSerializedKey(peerKey)
f.resMtx.RLock()
_, ok := f.activeReservations[peerIDKey][pendingChanID]
f.resMtx.RUnlock()

@ -3,6 +3,7 @@
package main
import (
"errors"
"fmt"
"io/ioutil"
"math/big"
@ -12,22 +13,22 @@ import (
"testing"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog"
"github.com/btcsuite/btcutil"
_ "github.com/btcsuite/btcwallet/walletdb/bdb"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
_ "github.com/btcsuite/btcwallet/walletdb/bdb"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
)
const (
@ -134,14 +135,64 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
type testNode struct {
privKey *btcec.PrivateKey
addr *lnwire.NetAddress
msgChan chan lnwire.Message
announceChan chan lnwire.Message
publTxChan chan *wire.MsgTx
fundingMgr *fundingManager
peer *peer
newChannels chan *newChannelMsg
mockNotifier *mockNotifier
testDir string
shutdownChannel chan struct{}
remotePeer *testNode
sendMessage func(lnwire.Message) error
}
var _ lnpeer.Peer = (*testNode)(nil)
func (n *testNode) IdentityKey() *btcec.PublicKey {
return n.addr.IdentityKey
}
func (n *testNode) Address() net.Addr {
return n.addr.Address
}
func (n *testNode) PubKey() [33]byte {
return newSerializedKey(n.addr.IdentityKey)
}
func (n *testNode) SendMessage(_ bool, msg ...lnwire.Message) error {
return n.sendMessage(msg[0])
}
func (n *testNode) WipeChannel(_ *wire.OutPoint) error {
return nil
}
func (n *testNode) AddNewChannel(channel *lnwallet.LightningChannel,
quit <-chan struct{}) error {
done := make(chan struct{})
msg := &newChannelMsg{
channel: channel,
done: done,
}
select {
case n.newChannels <- msg:
case <-quit:
return ErrFundingManagerShuttingDown
}
select {
case <-done:
case <-quit:
return ErrFundingManagerShuttingDown
}
return nil
}
func init() {
@ -180,7 +231,7 @@ func createTestWallet(cdb *channeldb.DB, netParams *chaincfg.Params,
}
func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
tempTestDir string) (*testNode, error) {
addr *lnwire.NetAddress, tempTestDir string) (*testNode, error) {
netParams := activeNetParams.Params
estimator := lnwallet.StaticFeeEstimator{FeeRate: 250}
@ -191,11 +242,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
epochChan: make(chan *chainntnfs.BlockEpoch, 1),
}
newChannelsChan := make(chan *newChannelMsg)
p := &peer{
newChannels: newChannelsChan,
}
sentMessages := make(chan lnwire.Message)
sentAnnouncements := make(chan lnwire.Message)
publTxChan := make(chan *wire.MsgTx, 1)
@ -249,20 +295,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) {
return lnwire.NodeAnnouncement{}, nil
},
SendToPeer: func(target *btcec.PublicKey, msgs ...lnwire.Message) error {
select {
case sentMessages <- msgs[0]:
case <-shutdownChan:
return fmt.Errorf("shutting down")
}
return nil
},
NotifyWhenOnline: func(peer *btcec.PublicKey, connectedChan chan<- struct{}) {
t.Fatalf("did not expect fundingManager to call NotifyWhenOnline")
},
FindPeer: func(peerKey *btcec.PublicKey) (*peer, error) {
return p, nil
},
TempChanIDSeed: chanIDSeed,
FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) {
dbChannels, err := cdb.FetchAllChannels()
@ -311,7 +343,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
RequiredRemoteMaxHTLCs: func(chanAmt btcutil.Amount) uint16 {
return uint16(lnwallet.MaxHTLCNumber / 2)
},
WatchNewChannel: func(*channeldb.OpenChannel, *lnwire.NetAddress) error {
WatchNewChannel: func(*channeldb.OpenChannel, *btcec.PublicKey) error {
return nil
},
ReportShortChanID: func(wire.OutPoint) error {
@ -323,22 +355,30 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
if err != nil {
t.Fatalf("failed creating fundingManager: %v", err)
}
if err = f.Start(); err != nil {
t.Fatalf("failed starting fundingManager: %v", err)
}
return &testNode{
testNode := &testNode{
privKey: privKey,
msgChan: sentMessages,
newChannels: make(chan *newChannelMsg),
announceChan: sentAnnouncements,
publTxChan: publTxChan,
fundingMgr: f,
peer: p,
mockNotifier: chainNotifier,
testDir: tempTestDir,
shutdownChannel: shutdownChan,
}, nil
addr: addr,
}
f.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
connectedChan chan<- lnpeer.Peer) {
connectedChan <- testNode
}
return testNode, nil
}
func recreateAliceFundingManager(t *testing.T, alice *testNode) {
@ -375,19 +415,11 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) {
return lnwire.NodeAnnouncement{}, nil
},
SendToPeer: func(target *btcec.PublicKey,
msgs ...lnwire.Message) error {
select {
case aliceMsgChan <- msgs[0]:
case <-shutdownChan:
return fmt.Errorf("shutting down")
}
return nil
NotifyWhenOnline: func(peer *btcec.PublicKey,
connectedChan chan<- lnpeer.Peer) {
connectedChan <- alice.remotePeer
},
NotifyWhenOnline: func(peer *btcec.PublicKey, connectedChan chan<- struct{}) {
t.Fatalf("did not expect fundingManager to call NotifyWhenOnline")
},
FindPeer: oldCfg.FindPeer,
TempChanIDSeed: oldCfg.TempChanIDSeed,
FindChannel: oldCfg.FindChannel,
PublishTransaction: func(txn *wire.MsgTx) error {
@ -424,7 +456,9 @@ func setupFundingManagers(t *testing.T) (*testNode, *testNode) {
t.Fatalf("unable to create temp directory: %v", err)
}
alice, err := createTestFundingManager(t, alicePrivKey, aliceTestDir)
alice, err := createTestFundingManager(
t, alicePrivKey, aliceAddr, aliceTestDir,
)
if err != nil {
t.Fatalf("failed creating fundingManager: %v", err)
}
@ -434,11 +468,37 @@ func setupFundingManagers(t *testing.T) (*testNode, *testNode) {
t.Fatalf("unable to create temp directory: %v", err)
}
bob, err := createTestFundingManager(t, bobPrivKey, bobTestDir)
bob, err := createTestFundingManager(t, bobPrivKey, bobAddr, bobTestDir)
if err != nil {
t.Fatalf("failed creating fundingManager: %v", err)
}
// With the funding manager's created, we'll now attempt to mimic a
// connection pipe between them. In order to intercept the messages
// within it, we'll redirect all messages back to the msgChan of the
// sender. Since the fundingManager now has a reference to peers itself,
// alice.sendMessage will be triggered when Bob's funding manager
// attempts to send a message to Alice and vice versa.
alice.remotePeer = bob
alice.sendMessage = func(msg lnwire.Message) error {
select {
case alice.remotePeer.msgChan <- msg:
case <-alice.shutdownChannel:
return errors.New("shutting down")
}
return nil
}
bob.remotePeer = alice
bob.sendMessage = func(msg lnwire.Message) error {
select {
case bob.remotePeer.msgChan <- msg:
case <-bob.shutdownChannel:
return errors.New("shutting down")
}
return nil
}
return alice, bob
}
@ -473,7 +533,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
err: errChan,
}
alice.fundingMgr.initFundingWorkflow(bobAddr, initReq)
alice.fundingMgr.initFundingWorkflow(bob, initReq)
// Alice should have sent the OpenChannel message to Bob.
var aliceMsg lnwire.Message
@ -498,7 +558,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
}
// Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr)
bob.fundingMgr.processFundingOpen(openChannelReq, alice)
// Bob should answer with an AcceptChannel message.
acceptChannelResponse := assertFundingMsgSent(
@ -506,7 +566,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
).(*lnwire.AcceptChannel)
// Forward the response to Alice.
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr)
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob)
// Alice responds with a FundingCreated message.
fundingCreated := assertFundingMsgSent(
@ -514,7 +574,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
).(*lnwire.FundingCreated)
// Give the message to Bob.
bob.fundingMgr.processFundingCreated(fundingCreated, aliceAddr)
bob.fundingMgr.processFundingCreated(fundingCreated, alice)
// Finally, Bob should send the FundingSigned message.
fundingSigned := assertFundingMsgSent(
@ -522,7 +582,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
).(*lnwire.FundingSigned)
// Forward the signature to Alice.
alice.fundingMgr.processFundingSigned(fundingSigned, bobAddr)
alice.fundingMgr.processFundingSigned(fundingSigned, bob)
// After Alice processes the singleFundingSignComplete message, she will
// broadcast the funding transaction to the network. We expect to get a
@ -861,14 +921,14 @@ func assertErrChannelNotFound(t *testing.T, node *testNode,
func assertHandleFundingLocked(t *testing.T, alice, bob *testNode) {
// They should both send the new channel state to their peer.
select {
case c := <-alice.peer.newChannels:
case c := <-alice.newChannels:
close(c.done)
case <-time.After(time.Second * 15):
t.Fatalf("alice did not send new channel to peer")
}
select {
case c := <-bob.peer.newChannels:
case c := <-bob.newChannels:
close(c.done)
case <-time.After(time.Second * 15):
t.Fatalf("bob did not send new channel to peer")
@ -935,8 +995,8 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
waitForOpenUpdate(t, updateChan)
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -970,12 +1030,15 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
// sending it on restart. We mimic this behavior by letting the
// SendToPeer method return an error, as if the message was not
// successfully sent. We then recreate the fundingManager and make sure
// it continues the process as expected.
alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey,
msgs ...lnwire.Message) error {
// it continues the process as expected. We'll save the current
// implementation of sendMessage to restore the original behavior later
// on.
workingSendMessage := bob.sendMessage
bob.sendMessage = func(msg lnwire.Message) error {
return fmt.Errorf("intentional error in SendToPeer")
}
alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, con chan<- struct{}) {
alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
con chan<- lnpeer.Peer) {
// Intentionally empty.
}
@ -1010,8 +1073,14 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
// While Bob successfully sent fundingLocked.
assertDatabaseState(t, bob, fundingOutPoint, fundingLockedSent)
// We now recreate Alice's fundingManager, and expect it to retry
// sending the fundingLocked message.
// We now recreate Alice's fundingManager with the correct sendMessage
// implementation, and expect it to retry sending the fundingLocked
// message. We'll explicitly shut down Alice's funding manager to
// prevent a race when overriding the sendMessage implementation.
if err := alice.fundingMgr.Stop(); err != nil {
t.Fatalf("unable to stop alice's funding manager: %v", err)
}
bob.sendMessage = workingSendMessage
recreateAliceFundingManager(t, alice)
// Intentionally make the channel announcements fail
@ -1036,8 +1105,8 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
}
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -1090,13 +1159,17 @@ func TestFundingManagerOfflinePeer(t *testing.T) {
// fundingLocked message to the other peer. If the funding node fails
// to send the fundingLocked message to the peer, it should wait for
// the server to notify it that the peer is back online, and try again.
alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey,
msgs ...lnwire.Message) error {
// We'll save the current implementation of sendMessage to restore the
// original behavior later on.
workingSendMessage := bob.sendMessage
bob.sendMessage = func(msg lnwire.Message) error {
return fmt.Errorf("intentional error in SendToPeer")
}
peerChan := make(chan *btcec.PublicKey, 1)
conChan := make(chan chan<- struct{}, 1)
alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, connected chan<- struct{}) {
conChan := make(chan chan<- lnpeer.Peer, 1)
alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
connected chan<- lnpeer.Peer) {
peerChan <- peer
conChan <- connected
}
@ -1132,9 +1205,10 @@ func TestFundingManagerOfflinePeer(t *testing.T) {
// While Bob successfully sent fundingLocked.
assertDatabaseState(t, bob, fundingOutPoint, fundingLockedSent)
// Alice should be waiting for the server to notify when Bob comes back online.
// Alice should be waiting for the server to notify when Bob comes back
// online.
var peer *btcec.PublicKey
var con chan<- struct{}
var con chan<- lnpeer.Peer
select {
case peer = <-peerChan:
// Expected
@ -1154,17 +1228,10 @@ func TestFundingManagerOfflinePeer(t *testing.T) {
bobPubKey, peer)
}
// Fix Alice's SendToPeer, and notify that Bob is back online.
alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey,
msgs ...lnwire.Message) error {
select {
case alice.msgChan <- msgs[0]:
case <-alice.shutdownChannel:
return fmt.Errorf("shutting down")
}
return nil
}
close(con)
// Restore the correct sendMessage implementation, and notify that Bob
// is back online.
bob.sendMessage = workingSendMessage
con <- bob
// This should make Alice send the fundingLocked.
fundingLockedAlice := assertFundingMsgSent(
@ -1186,8 +1253,8 @@ func TestFundingManagerOfflinePeer(t *testing.T) {
waitForOpenUpdate(t, updateChan)
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -1228,7 +1295,7 @@ func TestFundingManagerPeerTimeoutAfterInitFunding(t *testing.T) {
err: errChan,
}
alice.fundingMgr.initFundingWorkflow(bobAddr, initReq)
alice.fundingMgr.initFundingWorkflow(bob, initReq)
// Alice should have sent the OpenChannel message to Bob.
var aliceMsg lnwire.Message
@ -1288,7 +1355,7 @@ func TestFundingManagerPeerTimeoutAfterFundingOpen(t *testing.T) {
err: errChan,
}
alice.fundingMgr.initFundingWorkflow(bobAddr, initReq)
alice.fundingMgr.initFundingWorkflow(bob, initReq)
// Alice should have sent the OpenChannel message to Bob.
var aliceMsg lnwire.Message
@ -1316,7 +1383,7 @@ func TestFundingManagerPeerTimeoutAfterFundingOpen(t *testing.T) {
assertNumPendingReservations(t, alice, bobPubKey, 1)
// Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr)
bob.fundingMgr.processFundingOpen(openChannelReq, alice)
// Bob should answer with an AcceptChannel.
assertFundingMsgSent(t, bob.msgChan, "AcceptChannel")
@ -1357,7 +1424,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) {
err: errChan,
}
alice.fundingMgr.initFundingWorkflow(bobAddr, initReq)
alice.fundingMgr.initFundingWorkflow(bob, initReq)
// Alice should have sent the OpenChannel message to Bob.
var aliceMsg lnwire.Message
@ -1385,7 +1452,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) {
assertNumPendingReservations(t, alice, bobPubKey, 1)
// Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr)
bob.fundingMgr.processFundingOpen(openChannelReq, alice)
// Bob should answer with an AcceptChannel.
acceptChannelResponse := assertFundingMsgSent(
@ -1396,7 +1463,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) {
assertNumPendingReservations(t, bob, alicePubKey, 1)
// Forward the response to Alice.
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr)
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob)
// Alice responds with a FundingCreated messages.
assertFundingMsgSent(t, alice.msgChan, "FundingCreated")
@ -1571,9 +1638,9 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) {
waitForOpenUpdate(t, updateChan)
// Send the fundingLocked message twice to Alice, and once to Bob.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -1582,7 +1649,7 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) {
// Alice should not send the channel state the second time, as the
// second funding locked should just be ignored.
select {
case <-alice.peer.newChannels:
case <-alice.newChannels:
t.Fatalf("alice sent new channel to peer a second time")
case <-time.After(time.Millisecond * 300):
// Expected
@ -1590,9 +1657,9 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) {
// Another fundingLocked should also be ignored, since Alice should
// have updated her database at this point.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
select {
case <-alice.peer.newChannels:
case <-alice.newChannels:
t.Fatalf("alice sent new channel to peer a second time")
case <-time.After(time.Millisecond * 300):
// Expected
@ -1665,8 +1732,8 @@ func TestFundingManagerRestartAfterChanAnn(t *testing.T) {
recreateAliceFundingManager(t, alice)
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -1723,10 +1790,10 @@ func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) {
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
// Let Alice immediately get the fundingLocked message.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
// Also let Bob get the fundingLocked message.
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -1801,8 +1868,8 @@ func TestFundingManagerPrivateChannel(t *testing.T) {
waitForOpenUpdate(t, updateChan)
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -1886,8 +1953,8 @@ func TestFundingManagerPrivateRestart(t *testing.T) {
waitForOpenUpdate(t, updateChan)
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -1954,7 +2021,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
err: errChan,
}
alice.fundingMgr.initFundingWorkflow(bobAddr, initReq)
alice.fundingMgr.initFundingWorkflow(bob, initReq)
// Alice should have sent the OpenChannel message to Bob.
var aliceMsg lnwire.Message
@ -1993,7 +2060,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
chanID := openChannelReq.PendingChannelID
// Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr)
bob.fundingMgr.processFundingOpen(openChannelReq, alice)
// Bob should answer with an AcceptChannel message.
acceptChannelResponse := assertFundingMsgSent(
@ -2013,7 +2080,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
}
// Forward the response to Alice.
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr)
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob)
// Alice responds with a FundingCreated message.
fundingCreated := assertFundingMsgSent(
@ -2021,7 +2088,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
).(*lnwire.FundingCreated)
// Give the message to Bob.
bob.fundingMgr.processFundingCreated(fundingCreated, aliceAddr)
bob.fundingMgr.processFundingCreated(fundingCreated, alice)
// Finally, Bob should send the FundingSigned message.
fundingSigned := assertFundingMsgSent(
@ -2029,7 +2096,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
).(*lnwire.FundingSigned)
// Forward the signature to Alice.
alice.fundingMgr.processFundingSigned(fundingSigned, bobAddr)
alice.fundingMgr.processFundingSigned(fundingSigned, bob)
// After Alice processes the singleFundingSignComplete message, she will
// broadcast the funding transaction to the network. We expect to get a

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"math"
"net"
"reflect"
"runtime"
"strings"
@ -1411,6 +1412,9 @@ func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error {
}
return nil
}
func (m *mockPeer) AddNewChannel(_ *lnwallet.LightningChannel, _ <-chan struct{}) error {
return nil
}
func (m *mockPeer) WipeChannel(*wire.OutPoint) error {
return nil
}
@ -1420,8 +1424,9 @@ func (m *mockPeer) PubKey() [33]byte {
func (m *mockPeer) IdentityKey() *btcec.PublicKey {
return nil
}
var _ lnpeer.Peer = (*mockPeer)(nil)
func (m *mockPeer) Address() net.Addr {
return nil
}
func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
ChannelLink, *lnwallet.LightningChannel, chan time.Time, func() error,

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"sync"
"sync/atomic"
"testing"
@ -517,6 +518,16 @@ func (s *mockServer) IdentityKey() *btcec.PublicKey {
return pubkey
}
func (s *mockServer) Address() net.Addr {
return nil
}
func (s *mockServer) AddNewChannel(channel *lnwallet.LightningChannel,
cancel <-chan struct{}) error {
return nil
}
func (s *mockServer) WipeChannel(*wire.OutPoint) error {
return nil
}

6
lnd.go

@ -360,9 +360,7 @@ func lndMain() error {
idPrivKey.PubKey())
return <-errChan
},
SendToPeer: server.SendToPeer,
NotifyWhenOnline: server.NotifyWhenOnline,
FindPeer: server.FindPeer,
TempChanIDSeed: chanIDSeed,
FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) {
dbChannels, err := chanDB.FetchAllChannels()
@ -450,12 +448,12 @@ func lndMain() error {
return delay
},
WatchNewChannel: func(channel *channeldb.OpenChannel,
addr *lnwire.NetAddress) error {
peerKey *btcec.PublicKey) error {
// First, we'll mark this new peer as a persistent peer
// for re-connection purposes.
server.mu.Lock()
pubStr := string(addr.IdentityKey.SerializeCompressed())
pubStr := string(peerKey.SerializeCompressed())
server.persistentPeers[pubStr] = struct{}{}
server.mu.Unlock()

@ -1,8 +1,11 @@
package lnpeer
import (
"net"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
)
@ -14,6 +17,10 @@ type Peer interface {
// has been sent to the remote peer.
SendMessage(sync bool, msg ...lnwire.Message) error
// AddNewChannel adds a new channel to the peer. The channel should fail
// to be added if the cancel channel is closed.
AddNewChannel(channel *lnwallet.LightningChannel, cancel <-chan struct{}) error
// WipeChannel removes the channel uniquely identified by its channel
// point from all indexes associated with the peer.
WipeChannel(*wire.OutPoint) error
@ -23,4 +30,7 @@ type Peer interface {
// IdentityKey returns the public key of the remote peer.
IdentityKey() *btcec.PublicKey
// Address returns the network address of the remote peer.
Address() net.Addr
}

56
peer.go

@ -21,6 +21,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
@ -185,6 +186,9 @@ type peer struct {
wg sync.WaitGroup
}
// A compile-time check to ensure that peer satisfies the lnpeer.Peer interface.
var _ lnpeer.Peer = (*peer)(nil)
// newPeer creates a new peer from an establish connection object, and a
// pointer to the main server.
func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
@ -907,15 +911,15 @@ out:
p.queueMsg(lnwire.NewPong(pongBytes), nil)
case *lnwire.OpenChannel:
p.server.fundingMgr.processFundingOpen(msg, p.addr)
p.server.fundingMgr.processFundingOpen(msg, p)
case *lnwire.AcceptChannel:
p.server.fundingMgr.processFundingAccept(msg, p.addr)
p.server.fundingMgr.processFundingAccept(msg, p)
case *lnwire.FundingCreated:
p.server.fundingMgr.processFundingCreated(msg, p.addr)
p.server.fundingMgr.processFundingCreated(msg, p)
case *lnwire.FundingSigned:
p.server.fundingMgr.processFundingSigned(msg, p.addr)
p.server.fundingMgr.processFundingSigned(msg, p)
case *lnwire.FundingLocked:
p.server.fundingMgr.processFundingLocked(msg, p.addr)
p.server.fundingMgr.processFundingLocked(msg, p)
case *lnwire.Shutdown:
select {
@ -931,8 +935,9 @@ out:
}
case *lnwire.Error:
switch {
key := p.addr.IdentityKey
switch {
// In the case of an all-zero channel ID we want to
// forward the error to all channels with this peer.
case msg.ChanID == lnwire.ConnectionWideID:
@ -948,8 +953,8 @@ out:
// If the channel ID for the error message corresponds
// to a pending channel, then the funding manager will
// handle the error.
case p.server.fundingMgr.IsPendingChannel(msg.ChanID, p.addr):
p.server.fundingMgr.processFundingError(msg, p.addr)
case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key):
p.server.fundingMgr.processFundingError(msg, key)
// If not we hand the error to the channel link for
// this channel.
@ -1999,6 +2004,41 @@ func (p *peer) IdentityKey() *btcec.PublicKey {
return p.addr.IdentityKey
}
// Address returns the network address of the remote peer.
func (p *peer) Address() net.Addr {
return p.addr.Address
}
// AddNewChannel adds a new channel to the peer. The channel should fail to be
// added if the cancel channel is closed.
func (p *peer) AddNewChannel(channel *lnwallet.LightningChannel,
cancel <-chan struct{}) error {
newChanDone := make(chan struct{})
newChanMsg := &newChannelMsg{
channel: channel,
done: newChanDone,
}
select {
case p.newChannels <- newChanMsg:
case <-cancel:
return errors.New("canceled adding new channel")
case <-p.quit:
return ErrPeerExiting
}
// We pause here to wait for the peer to recognize the new channel
// before we close the channel barrier corresponding to the channel.
select {
case <-newChanDone:
case <-p.quit:
return ErrPeerExiting
}
return nil
}
// TODO(roasbeef): make all start/stop mutexes a CAS
// fetchLastChanUpdate returns a function which is able to retrieve the last

@ -110,7 +110,7 @@ type server struct {
inboundPeers map[string]*peer
outboundPeers map[string]*peer
peerConnectedListeners map[string][]chan<- struct{}
peerConnectedListeners map[string][]chan<- lnpeer.Peer
persistentPeers map[string]struct{}
persistentPeersBackoff map[string]time.Duration
@ -264,7 +264,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
peersByPub: make(map[string]*peer),
inboundPeers: make(map[string]*peer),
outboundPeers: make(map[string]*peer),
peerConnectedListeners: make(map[string][]chan<- struct{}),
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
globalFeatures: lnwire.NewFeatureVector(globalFeatures,
lnwire.GlobalFeatures),
@ -1568,31 +1568,38 @@ func (s *server) SendToPeer(target *btcec.PublicKey,
}
// NotifyWhenOnline can be called by other subsystems to get notified when a
// particular peer comes online.
// particular peer comes online. The peer itself is sent across the peerChan.
//
// NOTE: This function is safe for concurrent access.
func (s *server) NotifyWhenOnline(peer *btcec.PublicKey,
connectedChan chan<- struct{}) {
func (s *server) NotifyWhenOnline(peerKey *btcec.PublicKey,
peerChan chan<- lnpeer.Peer) {
s.mu.Lock()
defer s.mu.Unlock()
// Compute the target peer's identifier.
pubStr := string(peer.SerializeCompressed())
pubStr := string(peerKey.SerializeCompressed())
// Check if peer is connected.
_, ok := s.peersByPub[pubStr]
peer, ok := s.peersByPub[pubStr]
if ok {
// Connected, can return early.
srvrLog.Debugf("Notifying that peer %x is online",
peer.SerializeCompressed())
close(connectedChan)
peerKey.SerializeCompressed())
select {
case peerChan <- peer:
case <-s.quit:
}
return
}
// Not connected, store this listener such that it can be notified when
// the peer comes online.
s.peerConnectedListeners[pubStr] = append(
s.peerConnectedListeners[pubStr], connectedChan)
s.peerConnectedListeners[pubStr], peerChan,
)
}
// sendPeerMessages enqueues a list of messages into the outgoingQueue of the
@ -2240,8 +2247,12 @@ func (s *server) addPeer(p *peer) {
}
// Check if there are listeners waiting for this peer to come online.
for _, con := range s.peerConnectedListeners[pubStr] {
close(con)
for _, peerChan := range s.peerConnectedListeners[pubStr] {
select {
case peerChan <- p:
case <-s.quit:
return
}
}
delete(s.peerConnectedListeners, pubStr)
}
@ -2509,7 +2520,7 @@ func (s *server) OpenChannel(nodeKey *btcec.PublicKey,
// TODO(roasbeef): pass in chan that's closed if/when funding succeeds
// so can track as persistent peer?
go s.fundingMgr.initFundingWorkflow(targetPeer.addr, req)
go s.fundingMgr.initFundingWorkflow(targetPeer, req)
return updateChan, errChan
}