Merge pull request #1345 from cfromknecht/gossip-direct-send-msg
discovery: bypass server for direct message sends
This commit is contained in:
commit
9c0c9baee0
@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/go-errors/errors"
|
"github.com/go-errors/errors"
|
||||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
|
"github.com/lightningnetwork/lnd/lnpeer"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/lightningnetwork/lnd/multimutex"
|
"github.com/lightningnetwork/lnd/multimutex"
|
||||||
@ -35,8 +36,9 @@ var (
|
|||||||
// networkMsg couples a routing related wire message with the peer that
|
// networkMsg couples a routing related wire message with the peer that
|
||||||
// originally sent it.
|
// originally sent it.
|
||||||
type networkMsg struct {
|
type networkMsg struct {
|
||||||
peer *btcec.PublicKey
|
peer lnpeer.Peer
|
||||||
msg lnwire.Message
|
source *btcec.PublicKey
|
||||||
|
msg lnwire.Message
|
||||||
|
|
||||||
isRemote bool
|
isRemote bool
|
||||||
|
|
||||||
@ -97,6 +99,11 @@ type Config struct {
|
|||||||
// messages to a particular peer identified by the target public key.
|
// messages to a particular peer identified by the target public key.
|
||||||
SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error
|
SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error
|
||||||
|
|
||||||
|
// FindPeer returns the actively registered peer for a given remote
|
||||||
|
// public key. An error is returned if the peer was not found or a
|
||||||
|
// shutdown has been requested.
|
||||||
|
FindPeer func(identityKey *btcec.PublicKey) (lnpeer.Peer, error)
|
||||||
|
|
||||||
// NotifyWhenOnline is a function that allows the gossiper to be
|
// NotifyWhenOnline is a function that allows the gossiper to be
|
||||||
// notified when a certain peer comes online, allowing it to
|
// notified when a certain peer comes online, allowing it to
|
||||||
// retry sending a peer message.
|
// retry sending a peer message.
|
||||||
@ -147,8 +154,8 @@ type AuthenticatedGossiper struct {
|
|||||||
// as we know it. To be used atomically.
|
// as we know it. To be used atomically.
|
||||||
bestHeight uint32
|
bestHeight uint32
|
||||||
|
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
|
||||||
// cfg is a copy of the configuration struct that the gossiper service
|
// cfg is a copy of the configuration struct that the gossiper service
|
||||||
// was initialized with.
|
// was initialized with.
|
||||||
@ -244,7 +251,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) {
|
|||||||
// the entire network graph is read from disk, then serialized to the format
|
// the entire network graph is read from disk, then serialized to the format
|
||||||
// defined within the current wire protocol. This cache of graph data is then
|
// defined within the current wire protocol. This cache of graph data is then
|
||||||
// sent directly to the target node.
|
// sent directly to the target node.
|
||||||
func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) error {
|
func (d *AuthenticatedGossiper) SynchronizeNode(syncPeer lnpeer.Peer) error {
|
||||||
// TODO(roasbeef): need to also store sig data in db
|
// TODO(roasbeef): need to also store sig data in db
|
||||||
// * will be nice when we switch to pairing sigs would only need one ^_^
|
// * will be nice when we switch to pairing sigs would only need one ^_^
|
||||||
|
|
||||||
@ -355,12 +362,12 @@ func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("Syncing channel graph state with %x, sending %v "+
|
log.Infof("Syncing channel graph state with %x, sending %v "+
|
||||||
"vertexes and %v edges", pub.SerializeCompressed(),
|
"vertexes and %v edges", syncPeer.PubKey(),
|
||||||
numNodes, numEdges)
|
numNodes, numEdges)
|
||||||
|
|
||||||
// With all the announcement messages gathered, send them all in a
|
// With all the announcement messages gathered, send them all in a
|
||||||
// single batch to the target peer.
|
// single batch to the target peer.
|
||||||
return d.cfg.SendToPeer(pub, announceMessages...)
|
return syncPeer.SendMessage(false, announceMessages...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to update the
|
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to update the
|
||||||
@ -452,12 +459,13 @@ func (d *AuthenticatedGossiper) Stop() {
|
|||||||
// peers. Remote channel announcements should contain the announcement proof
|
// peers. Remote channel announcements should contain the announcement proof
|
||||||
// and be fully validated.
|
// and be fully validated.
|
||||||
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
|
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
|
||||||
src *btcec.PublicKey) chan error {
|
peer lnpeer.Peer) chan error {
|
||||||
|
|
||||||
nMsg := &networkMsg{
|
nMsg := &networkMsg{
|
||||||
msg: msg,
|
msg: msg,
|
||||||
isRemote: true,
|
isRemote: true,
|
||||||
peer: src,
|
peer: peer,
|
||||||
|
source: peer.IdentityKey(),
|
||||||
err: make(chan error, 1),
|
err: make(chan error, 1),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -478,12 +486,12 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
|
|||||||
// entire channel announcement and update messages will be re-constructed and
|
// entire channel announcement and update messages will be re-constructed and
|
||||||
// broadcast to the rest of the network.
|
// broadcast to the rest of the network.
|
||||||
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
|
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
|
||||||
src *btcec.PublicKey) chan error {
|
source *btcec.PublicKey) chan error {
|
||||||
|
|
||||||
nMsg := &networkMsg{
|
nMsg := &networkMsg{
|
||||||
msg: msg,
|
msg: msg,
|
||||||
isRemote: false,
|
isRemote: false,
|
||||||
peer: src,
|
source: source,
|
||||||
err: make(chan error, 1),
|
err: make(chan error, 1),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -587,7 +595,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) {
|
|||||||
// Channel announcements are identified by the short channel id field.
|
// Channel announcements are identified by the short channel id field.
|
||||||
case *lnwire.ChannelAnnouncement:
|
case *lnwire.ChannelAnnouncement:
|
||||||
deDupKey := msg.ShortChannelID
|
deDupKey := msg.ShortChannelID
|
||||||
sender := routing.NewVertex(message.peer)
|
sender := routing.NewVertex(message.source)
|
||||||
|
|
||||||
mws, ok := d.channelAnnouncements[deDupKey]
|
mws, ok := d.channelAnnouncements[deDupKey]
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -609,7 +617,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) {
|
|||||||
// Channel updates are identified by the (short channel id, flags)
|
// Channel updates are identified by the (short channel id, flags)
|
||||||
// tuple.
|
// tuple.
|
||||||
case *lnwire.ChannelUpdate:
|
case *lnwire.ChannelUpdate:
|
||||||
sender := routing.NewVertex(message.peer)
|
sender := routing.NewVertex(message.source)
|
||||||
deDupKey := channelUpdateID{
|
deDupKey := channelUpdateID{
|
||||||
msg.ShortChannelID,
|
msg.ShortChannelID,
|
||||||
msg.Flags,
|
msg.Flags,
|
||||||
@ -658,7 +666,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) {
|
|||||||
// Node announcements are identified by the Vertex field. Use the
|
// Node announcements are identified by the Vertex field. Use the
|
||||||
// NodeID to create the corresponding Vertex.
|
// NodeID to create the corresponding Vertex.
|
||||||
case *lnwire.NodeAnnouncement:
|
case *lnwire.NodeAnnouncement:
|
||||||
sender := routing.NewVertex(message.peer)
|
sender := routing.NewVertex(message.source)
|
||||||
deDupKey := routing.Vertex(msg.NodeID)
|
deDupKey := routing.Vertex(msg.NodeID)
|
||||||
|
|
||||||
// We do the same for node announcements as we did for channel
|
// We do the same for node announcements as we did for channel
|
||||||
@ -875,7 +883,7 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
|
|||||||
// gossip syncer for an inbound message so we can properly dispatch the
|
// gossip syncer for an inbound message so we can properly dispatch the
|
||||||
// incoming message. If a gossip syncer isn't found, then one will be created
|
// incoming message. If a gossip syncer isn't found, then one will be created
|
||||||
// for the target peer.
|
// for the target peer.
|
||||||
func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSyncer {
|
func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipSyncer, error) {
|
||||||
target := routing.NewVertex(pub)
|
target := routing.NewVertex(pub)
|
||||||
|
|
||||||
// First, we'll try to find an existing gossiper for this peer.
|
// First, we'll try to find an existing gossiper for this peer.
|
||||||
@ -885,16 +893,26 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSy
|
|||||||
|
|
||||||
// If one exists, then we'll return it directly.
|
// If one exists, then we'll return it directly.
|
||||||
if ok {
|
if ok {
|
||||||
return syncer
|
return syncer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise, we'll obtain the mutex, then check again if a gossiper
|
// A known gossip syncer doesn't exist, so we may have to create one
|
||||||
// was added after we dropped the read mutex.
|
// from scratch. To do so, we'll query for a reference directly to the
|
||||||
|
// active peer.
|
||||||
|
syncPeer, err := d.cfg.FindPeer(pub)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("unable to find gossip peer %v: %v",
|
||||||
|
pub.SerializeCompressed(), err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally, we'll obtain the exclusive mutex, then check again if a
|
||||||
|
// gossiper was added after we dropped the read mutex.
|
||||||
d.syncerMtx.Lock()
|
d.syncerMtx.Lock()
|
||||||
syncer, ok = d.peerSyncers[target]
|
syncer, ok = d.peerSyncers[target]
|
||||||
if ok {
|
if ok {
|
||||||
d.syncerMtx.Unlock()
|
d.syncerMtx.Unlock()
|
||||||
return syncer
|
return syncer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// At this point, a syncer doesn't yet exist, so we'll create a new one
|
// At this point, a syncer doesn't yet exist, so we'll create a new one
|
||||||
@ -905,7 +923,7 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSy
|
|||||||
channelSeries: d.cfg.ChanSeries,
|
channelSeries: d.cfg.ChanSeries,
|
||||||
encodingType: lnwire.EncodingSortedPlain,
|
encodingType: lnwire.EncodingSortedPlain,
|
||||||
sendToPeer: func(msgs ...lnwire.Message) error {
|
sendToPeer: func(msgs ...lnwire.Message) error {
|
||||||
return d.cfg.SendToPeer(pub, msgs...)
|
return syncPeer.SendMessage(false, msgs...)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
copy(syncer.peerPub[:], pub.SerializeCompressed())
|
copy(syncer.peerPub[:], pub.SerializeCompressed())
|
||||||
@ -914,7 +932,7 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSy
|
|||||||
|
|
||||||
d.syncerMtx.Unlock()
|
d.syncerMtx.Unlock()
|
||||||
|
|
||||||
return syncer
|
return syncer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// networkHandler is the primary goroutine that drives this service. The roles
|
// networkHandler is the primary goroutine that drives this service. The roles
|
||||||
@ -992,15 +1010,20 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
// then we'll dispatch that directly to the proper
|
// then we'll dispatch that directly to the proper
|
||||||
// gossipSyncer.
|
// gossipSyncer.
|
||||||
case *lnwire.GossipTimestampRange:
|
case *lnwire.GossipTimestampRange:
|
||||||
syncer := d.findGossipSyncer(announcement.peer)
|
syncer, err := d.findGossipSyncer(
|
||||||
|
announcement.source,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// If we've found the message target, then
|
// If we've found the message target, then
|
||||||
// we'll dispatch the message directly to it.
|
// we'll dispatch the message directly to it.
|
||||||
err := syncer.ApplyGossipFilter(msg)
|
err = syncer.ApplyGossipFilter(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("unable to apply gossip "+
|
log.Warnf("unable to apply gossip "+
|
||||||
"filter for peer=%x: %v",
|
"filter for peer=%x: %v",
|
||||||
announcement.peer.SerializeCompressed(), err)
|
announcement.peer.PubKey(), err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -1012,7 +1035,12 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
*lnwire.ReplyChannelRange,
|
*lnwire.ReplyChannelRange,
|
||||||
*lnwire.ReplyShortChanIDsEnd:
|
*lnwire.ReplyShortChanIDsEnd:
|
||||||
|
|
||||||
syncer := d.findGossipSyncer(announcement.peer)
|
syncer, err := d.findGossipSyncer(
|
||||||
|
announcement.source,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
syncer.ProcessQueryMsg(announcement.msg)
|
syncer.ProcessQueryMsg(announcement.msg)
|
||||||
continue
|
continue
|
||||||
@ -1193,19 +1221,19 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
// needed to handle new queries. The recvUpdates bool indicates if we should
|
// needed to handle new queries. The recvUpdates bool indicates if we should
|
||||||
// continue to receive real-time updates from the remote peer once we've synced
|
// continue to receive real-time updates from the remote peer once we've synced
|
||||||
// channel state.
|
// channel state.
|
||||||
func (d *AuthenticatedGossiper) InitSyncState(peer *btcec.PublicKey, recvUpdates bool) {
|
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, recvUpdates bool) {
|
||||||
d.syncerMtx.Lock()
|
d.syncerMtx.Lock()
|
||||||
defer d.syncerMtx.Unlock()
|
defer d.syncerMtx.Unlock()
|
||||||
|
|
||||||
// If we already have a syncer, then we'll exit early as we don't want
|
// If we already have a syncer, then we'll exit early as we don't want
|
||||||
// to override it.
|
// to override it.
|
||||||
nodeID := routing.NewVertex(peer)
|
nodeID := routing.Vertex(syncPeer.PubKey())
|
||||||
if _, ok := d.peerSyncers[nodeID]; ok {
|
if _, ok := d.peerSyncers[nodeID]; ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("Creating new gossipSyncer for peer=%x",
|
log.Infof("Creating new gossipSyncer for peer=%x",
|
||||||
peer.SerializeCompressed())
|
nodeID)
|
||||||
|
|
||||||
syncer := newGossiperSyncer(gossipSyncerCfg{
|
syncer := newGossiperSyncer(gossipSyncerCfg{
|
||||||
chainHash: d.cfg.ChainHash,
|
chainHash: d.cfg.ChainHash,
|
||||||
@ -1213,10 +1241,10 @@ func (d *AuthenticatedGossiper) InitSyncState(peer *btcec.PublicKey, recvUpdates
|
|||||||
channelSeries: d.cfg.ChanSeries,
|
channelSeries: d.cfg.ChanSeries,
|
||||||
encodingType: lnwire.EncodingSortedPlain,
|
encodingType: lnwire.EncodingSortedPlain,
|
||||||
sendToPeer: func(msgs ...lnwire.Message) error {
|
sendToPeer: func(msgs ...lnwire.Message) error {
|
||||||
return d.cfg.SendToPeer(peer, msgs...)
|
return syncPeer.SendMessage(false, msgs...)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
copy(syncer.peerPub[:], peer.SerializeCompressed())
|
copy(syncer.peerPub[:], nodeID[:])
|
||||||
d.peerSyncers[nodeID] = syncer
|
d.peerSyncers[nodeID] = syncer
|
||||||
|
|
||||||
syncer.Start()
|
syncer.Start()
|
||||||
@ -1429,8 +1457,8 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
|
|||||||
// We set ourselves as the source of this message to indicate
|
// We set ourselves as the source of this message to indicate
|
||||||
// that we shouldn't skip any peers when sending this message.
|
// that we shouldn't skip any peers when sending this message.
|
||||||
chanUpdates = append(chanUpdates, networkMsg{
|
chanUpdates = append(chanUpdates, networkMsg{
|
||||||
peer: d.selfKey,
|
source: d.selfKey,
|
||||||
msg: chanUpdate,
|
msg: chanUpdate,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1502,19 +1530,19 @@ func (d *AuthenticatedGossiper) processRejectedEdge(chanAnnMsg *lnwire.ChannelAn
|
|||||||
// our peers.
|
// our peers.
|
||||||
announcements := make([]networkMsg, 0, 3)
|
announcements := make([]networkMsg, 0, 3)
|
||||||
announcements = append(announcements, networkMsg{
|
announcements = append(announcements, networkMsg{
|
||||||
msg: chanAnn,
|
source: d.selfKey,
|
||||||
peer: d.selfKey,
|
msg: chanAnn,
|
||||||
})
|
})
|
||||||
if e1Ann != nil {
|
if e1Ann != nil {
|
||||||
announcements = append(announcements, networkMsg{
|
announcements = append(announcements, networkMsg{
|
||||||
msg: e1Ann,
|
source: d.selfKey,
|
||||||
peer: d.selfKey,
|
msg: e1Ann,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if e2Ann != nil {
|
if e2Ann != nil {
|
||||||
announcements = append(announcements, networkMsg{
|
announcements = append(announcements, networkMsg{
|
||||||
msg: e2Ann,
|
source: d.selfKey,
|
||||||
peer: d.selfKey,
|
msg: e2Ann,
|
||||||
})
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -1591,8 +1619,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
|||||||
// Node announcement was successfully proceeded and know it
|
// Node announcement was successfully proceeded and know it
|
||||||
// might be broadcast to other connected nodes.
|
// might be broadcast to other connected nodes.
|
||||||
announcements = append(announcements, networkMsg{
|
announcements = append(announcements, networkMsg{
|
||||||
msg: msg,
|
peer: nMsg.peer,
|
||||||
peer: nMsg.peer,
|
source: nMsg.source,
|
||||||
|
msg: msg,
|
||||||
})
|
})
|
||||||
|
|
||||||
nMsg.err <- nil
|
nMsg.err <- nil
|
||||||
@ -1776,7 +1805,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
|||||||
msg, nMsg.peer)
|
msg, nMsg.peer)
|
||||||
} else {
|
} else {
|
||||||
err = <-d.ProcessLocalAnnouncement(
|
err = <-d.ProcessLocalAnnouncement(
|
||||||
msg, nMsg.peer)
|
msg, nMsg.source)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Failed reprocessing"+
|
log.Errorf("Failed reprocessing"+
|
||||||
@ -1801,8 +1830,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
|||||||
// announcement with proof (remote).
|
// announcement with proof (remote).
|
||||||
if proof != nil {
|
if proof != nil {
|
||||||
announcements = append(announcements, networkMsg{
|
announcements = append(announcements, networkMsg{
|
||||||
msg: msg,
|
peer: nMsg.peer,
|
||||||
peer: nMsg.peer,
|
source: nMsg.source,
|
||||||
|
msg: msg,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1987,20 +2017,29 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
|||||||
// so we'll try sending the update directly to the remote peer.
|
// so we'll try sending the update directly to the remote peer.
|
||||||
if !nMsg.isRemote && chanInfo.AuthProof == nil {
|
if !nMsg.isRemote && chanInfo.AuthProof == nil {
|
||||||
// Get our peer's public key.
|
// Get our peer's public key.
|
||||||
var remotePeer *btcec.PublicKey
|
var remotePub *btcec.PublicKey
|
||||||
switch {
|
switch {
|
||||||
case msg.Flags&lnwire.ChanUpdateDirection == 0:
|
case msg.Flags&lnwire.ChanUpdateDirection == 0:
|
||||||
remotePeer, _ = chanInfo.NodeKey2()
|
remotePub, _ = chanInfo.NodeKey2()
|
||||||
case msg.Flags&lnwire.ChanUpdateDirection == 1:
|
case msg.Flags&lnwire.ChanUpdateDirection == 1:
|
||||||
remotePeer, _ = chanInfo.NodeKey1()
|
remotePub, _ = chanInfo.NodeKey1()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send ChannelUpdate directly to remotePeer.
|
sPeer, err := d.cfg.FindPeer(remotePub)
|
||||||
// TODO(halseth): make reliable send?
|
if err != nil {
|
||||||
if err = d.cfg.SendToPeer(remotePeer, msg); err != nil {
|
log.Errorf("unable to send channel update -- "+
|
||||||
log.Errorf("unable to send channel update "+
|
"could not find peer %x: %v",
|
||||||
"message to peer %x: %v",
|
remotePub, err)
|
||||||
remotePeer.SerializeCompressed(), err)
|
} else {
|
||||||
|
// Send ChannelUpdate directly to remotePeer.
|
||||||
|
// TODO(halseth): make reliable send?
|
||||||
|
err = sPeer.SendMessage(false, msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("unable to send channel "+
|
||||||
|
"update message to peer %x: %v",
|
||||||
|
remotePub.SerializeCompressed(),
|
||||||
|
err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2010,8 +2049,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
|||||||
// has an attached authentication proof.
|
// has an attached authentication proof.
|
||||||
if chanInfo.AuthProof != nil {
|
if chanInfo.AuthProof != nil {
|
||||||
announcements = append(announcements, networkMsg{
|
announcements = append(announcements, networkMsg{
|
||||||
msg: msg,
|
peer: nMsg.peer,
|
||||||
peer: nMsg.peer,
|
source: nMsg.source,
|
||||||
|
msg: msg,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2083,10 +2123,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
isFirstNode := bytes.Equal(nMsg.peer.SerializeCompressed(),
|
nodeID := nMsg.source.SerializeCompressed()
|
||||||
chanInfo.NodeKey1Bytes[:])
|
isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
|
||||||
isSecondNode := bytes.Equal(nMsg.peer.SerializeCompressed(),
|
isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
|
||||||
chanInfo.NodeKey2Bytes[:])
|
|
||||||
|
|
||||||
// Ensure that channel that was retrieved belongs to the peer
|
// Ensure that channel that was retrieved belongs to the peer
|
||||||
// which sent the proof announcement.
|
// which sent the proof announcement.
|
||||||
@ -2130,7 +2169,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
|||||||
// received our local proof yet. So be kind and send
|
// received our local proof yet. So be kind and send
|
||||||
// them the full proof.
|
// them the full proof.
|
||||||
if nMsg.isRemote {
|
if nMsg.isRemote {
|
||||||
peerID := nMsg.peer.SerializeCompressed()
|
peerID := nMsg.source.SerializeCompressed()
|
||||||
log.Debugf("Got AnnounceSignatures for " +
|
log.Debugf("Got AnnounceSignatures for " +
|
||||||
"channel with full proof.")
|
"channel with full proof.")
|
||||||
|
|
||||||
@ -2151,7 +2190,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
|||||||
log.Errorf("unable to gen ann: %v", err)
|
log.Errorf("unable to gen ann: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = d.cfg.SendToPeer(nMsg.peer, chanAnn)
|
err = nMsg.peer.SendMessage(false, chanAnn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Failed sending "+
|
log.Errorf("Failed sending "+
|
||||||
"full proof to "+
|
"full proof to "+
|
||||||
@ -2271,19 +2310,22 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
|||||||
// Assemble the necessary announcements to add to the next
|
// Assemble the necessary announcements to add to the next
|
||||||
// broadcasting batch.
|
// broadcasting batch.
|
||||||
announcements = append(announcements, networkMsg{
|
announcements = append(announcements, networkMsg{
|
||||||
msg: chanAnn,
|
peer: nMsg.peer,
|
||||||
peer: nMsg.peer,
|
source: nMsg.source,
|
||||||
|
msg: chanAnn,
|
||||||
})
|
})
|
||||||
if e1Ann != nil {
|
if e1Ann != nil {
|
||||||
announcements = append(announcements, networkMsg{
|
announcements = append(announcements, networkMsg{
|
||||||
msg: e1Ann,
|
peer: nMsg.peer,
|
||||||
peer: nMsg.peer,
|
source: nMsg.source,
|
||||||
|
msg: e1Ann,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if e2Ann != nil {
|
if e2Ann != nil {
|
||||||
announcements = append(announcements, networkMsg{
|
announcements = append(announcements, networkMsg{
|
||||||
msg: e2Ann,
|
peer: nMsg.peer,
|
||||||
peer: nMsg.peer,
|
source: nMsg.source,
|
||||||
|
msg: e2Ann,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2493,15 +2535,10 @@ func (d *AuthenticatedGossiper) maybeRequestChanAnn(cid lnwire.ShortChannelID) e
|
|||||||
// If this syncer is already at the terminal state, then we'll
|
// If this syncer is already at the terminal state, then we'll
|
||||||
// chose it to request the fully channel update.
|
// chose it to request the fully channel update.
|
||||||
if syncer.SyncState() == chansSynced {
|
if syncer.SyncState() == chansSynced {
|
||||||
pub, err := btcec.ParsePubKey(nodeID[:], btcec.S256())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debugf("attempting to request chan ann for "+
|
log.Debugf("attempting to request chan ann for "+
|
||||||
"chan_id=%v from node=%x", cid, nodeID[:])
|
"chan_id=%v from node=%x", cid, nodeID[:])
|
||||||
|
|
||||||
return d.cfg.SendToPeer(pub, &lnwire.QueryShortChanIDs{
|
return syncer.cfg.sendToPeer(&lnwire.QueryShortChanIDs{
|
||||||
ChainHash: d.cfg.ChainHash,
|
ChainHash: d.cfg.ChainHash,
|
||||||
EncodingType: lnwire.EncodingSortedPlain,
|
EncodingType: lnwire.EncodingSortedPlain,
|
||||||
ShortChanIDs: []lnwire.ShortChannelID{cid},
|
ShortChanIDs: []lnwire.ShortChannelID{cid},
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/go-errors/errors"
|
"github.com/go-errors/errors"
|
||||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
|
"github.com/lightningnetwork/lnd/lnpeer"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/lightningnetwork/lnd/routing"
|
"github.com/lightningnetwork/lnd/routing"
|
||||||
"github.com/roasbeef/btcd/btcec"
|
"github.com/roasbeef/btcd/btcec"
|
||||||
@ -535,6 +536,9 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
|
|||||||
SendToPeer: func(target *btcec.PublicKey, msg ...lnwire.Message) error {
|
SendToPeer: func(target *btcec.PublicKey, msg ...lnwire.Message) error {
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
FindPeer: func(target *btcec.PublicKey) (lnpeer.Peer, error) {
|
||||||
|
return &mockPeer{target, nil, nil}, nil
|
||||||
|
},
|
||||||
Router: router,
|
Router: router,
|
||||||
TrickleDelay: trickleDelay,
|
TrickleDelay: trickleDelay,
|
||||||
RetransmitDelay: retransmitDelay,
|
RetransmitDelay: retransmitDelay,
|
||||||
@ -583,7 +587,7 @@ func TestProcessAnnouncement(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create node valid, signed announcement, process it with
|
// Create node valid, signed announcement, process it with
|
||||||
// gossiper service, check that valid announcement have been
|
// gossiper service, check that valid announcement have been
|
||||||
// propagated farther into the lightning network, and check that we
|
// propagated farther into the lightning network, and check that we
|
||||||
// added new node into router.
|
// added new node into router.
|
||||||
@ -592,10 +596,10 @@ func TestProcessAnnouncement(t *testing.T) {
|
|||||||
t.Fatalf("can't create node announcement: %v", err)
|
t.Fatalf("can't create node announcement: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
nodePub := nodeKeyPriv1.PubKey()
|
nodePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, nodePub):
|
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, nodePeer):
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatal("remote announcement not processed")
|
t.Fatal("remote announcement not processed")
|
||||||
}
|
}
|
||||||
@ -605,7 +609,7 @@ func TestProcessAnnouncement(t *testing.T) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case msg := <-ctx.broadcastedMessage:
|
case msg := <-ctx.broadcastedMessage:
|
||||||
assertSenderExistence(nodePub, msg)
|
assertSenderExistence(nodePeer.IdentityKey(), msg)
|
||||||
case <-time.After(2 * trickleDelay):
|
case <-time.After(2 * trickleDelay):
|
||||||
t.Fatal("announcement wasn't proceeded")
|
t.Fatal("announcement wasn't proceeded")
|
||||||
}
|
}
|
||||||
@ -623,7 +627,7 @@ func TestProcessAnnouncement(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePub):
|
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer):
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatal("remote announcement not processed")
|
t.Fatal("remote announcement not processed")
|
||||||
}
|
}
|
||||||
@ -633,7 +637,7 @@ func TestProcessAnnouncement(t *testing.T) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case msg := <-ctx.broadcastedMessage:
|
case msg := <-ctx.broadcastedMessage:
|
||||||
assertSenderExistence(nodePub, msg)
|
assertSenderExistence(nodePeer.IdentityKey(), msg)
|
||||||
case <-time.After(2 * trickleDelay):
|
case <-time.After(2 * trickleDelay):
|
||||||
t.Fatal("announcement wasn't proceeded")
|
t.Fatal("announcement wasn't proceeded")
|
||||||
}
|
}
|
||||||
@ -651,7 +655,7 @@ func TestProcessAnnouncement(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePub):
|
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer):
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatal("remote announcement not processed")
|
t.Fatal("remote announcement not processed")
|
||||||
}
|
}
|
||||||
@ -661,7 +665,7 @@ func TestProcessAnnouncement(t *testing.T) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case msg := <-ctx.broadcastedMessage:
|
case msg := <-ctx.broadcastedMessage:
|
||||||
assertSenderExistence(nodePub, msg)
|
assertSenderExistence(nodePeer.IdentityKey(), msg)
|
||||||
case <-time.After(2 * trickleDelay):
|
case <-time.After(2 * trickleDelay):
|
||||||
t.Fatal("announcement wasn't proceeded")
|
t.Fatal("announcement wasn't proceeded")
|
||||||
}
|
}
|
||||||
@ -690,7 +694,7 @@ func TestPrematureAnnouncement(t *testing.T) {
|
|||||||
t.Fatalf("can't create node announcement: %v", err)
|
t.Fatalf("can't create node announcement: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
nodePub := nodeKeyPriv1.PubKey()
|
nodePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil}
|
||||||
|
|
||||||
// Pretending that we receive the valid channel announcement from
|
// Pretending that we receive the valid channel announcement from
|
||||||
// remote side, but block height of this announcement is greater than
|
// remote side, but block height of this announcement is greater than
|
||||||
@ -702,7 +706,7 @@ func TestPrematureAnnouncement(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePub):
|
case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer):
|
||||||
t.Fatal("announcement was proceeded")
|
t.Fatal("announcement was proceeded")
|
||||||
case <-time.After(100 * time.Millisecond):
|
case <-time.After(100 * time.Millisecond):
|
||||||
}
|
}
|
||||||
@ -721,7 +725,7 @@ func TestPrematureAnnouncement(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePub):
|
case <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer):
|
||||||
t.Fatal("announcement was proceeded")
|
t.Fatal("announcement was proceeded")
|
||||||
case <-time.After(100 * time.Millisecond):
|
case <-time.After(100 * time.Millisecond):
|
||||||
}
|
}
|
||||||
@ -770,6 +774,9 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
|
|||||||
// Set up a channel that we can use to inspect the messages
|
// Set up a channel that we can use to inspect the messages
|
||||||
// sent directly fromn the gossiper.
|
// sent directly fromn the gossiper.
|
||||||
sentMsgs := make(chan lnwire.Message, 10)
|
sentMsgs := make(chan lnwire.Message, 10)
|
||||||
|
ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) {
|
||||||
|
return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil
|
||||||
|
}
|
||||||
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error {
|
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error {
|
||||||
select {
|
select {
|
||||||
case sentMsgs <- msg[0]:
|
case sentMsgs <- msg[0]:
|
||||||
@ -792,6 +799,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to parse pubkey: %v", err)
|
t.Fatalf("unable to parse pubkey: %v", err)
|
||||||
}
|
}
|
||||||
|
remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit}
|
||||||
|
|
||||||
// Recreate lightning network topology. Initialize router with channel
|
// Recreate lightning network topology. Initialize router with channel
|
||||||
// between two nodes.
|
// between two nodes.
|
||||||
@ -840,7 +848,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
||||||
remoteKey):
|
remotePeer):
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatal("did not process remote announcement")
|
t.Fatal("did not process remote announcement")
|
||||||
}
|
}
|
||||||
@ -887,7 +895,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
||||||
remoteKey):
|
remotePeer):
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatal("did not process remote announcement")
|
t.Fatal("did not process remote announcement")
|
||||||
}
|
}
|
||||||
@ -932,6 +940,9 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
|
|||||||
// Set up a channel that we can use to inspect the messages
|
// Set up a channel that we can use to inspect the messages
|
||||||
// sent directly from the gossiper.
|
// sent directly from the gossiper.
|
||||||
sentMsgs := make(chan lnwire.Message, 10)
|
sentMsgs := make(chan lnwire.Message, 10)
|
||||||
|
ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) {
|
||||||
|
return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil
|
||||||
|
}
|
||||||
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error {
|
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error {
|
||||||
select {
|
select {
|
||||||
case sentMsgs <- msg[0]:
|
case sentMsgs <- msg[0]:
|
||||||
@ -954,6 +965,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to parse pubkey: %v", err)
|
t.Fatalf("unable to parse pubkey: %v", err)
|
||||||
}
|
}
|
||||||
|
remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit}
|
||||||
|
|
||||||
// Pretending that we receive local channel announcement from funding
|
// Pretending that we receive local channel announcement from funding
|
||||||
// manager, thereby kick off the announcement exchange process, in
|
// manager, thereby kick off the announcement exchange process, in
|
||||||
@ -961,7 +973,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
|
|||||||
// because we haven't announce the channel yet.
|
// because we haven't announce the channel yet.
|
||||||
select {
|
select {
|
||||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
||||||
remoteKey):
|
remotePeer):
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatal("did not process remote announcement")
|
t.Fatal("did not process remote announcement")
|
||||||
}
|
}
|
||||||
@ -1032,7 +1044,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
||||||
remoteKey):
|
remotePeer):
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatal("did not process remote announcement")
|
t.Fatal("did not process remote announcement")
|
||||||
}
|
}
|
||||||
@ -1116,6 +1128,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to parse pubkey: %v", err)
|
t.Fatalf("unable to parse pubkey: %v", err)
|
||||||
}
|
}
|
||||||
|
remotePeer := &mockPeer{remoteKey, nil, nil}
|
||||||
|
|
||||||
// Recreate lightning network topology. Initialize router with channel
|
// Recreate lightning network topology. Initialize router with channel
|
||||||
// between two nodes.
|
// between two nodes.
|
||||||
@ -1151,7 +1164,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
||||||
remoteKey):
|
remotePeer):
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatal("did not process remote announcement")
|
t.Fatal("did not process remote announcement")
|
||||||
}
|
}
|
||||||
@ -1248,7 +1261,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) {
|
|||||||
// broadcast of 3 messages (ChannelAnnouncement + 2 ChannelUpdate).
|
// broadcast of 3 messages (ChannelAnnouncement + 2 ChannelUpdate).
|
||||||
select {
|
select {
|
||||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
||||||
remoteKey):
|
remotePeer):
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatal("did not process local announcement")
|
t.Fatal("did not process local announcement")
|
||||||
}
|
}
|
||||||
@ -1304,6 +1317,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to parse pubkey: %v", err)
|
t.Fatalf("unable to parse pubkey: %v", err)
|
||||||
}
|
}
|
||||||
|
remotePeer := &mockPeer{remoteKey, nil, nil}
|
||||||
|
|
||||||
// Recreate lightning network topology. Initialize router with channel
|
// Recreate lightning network topology. Initialize router with channel
|
||||||
// between two nodes.
|
// between two nodes.
|
||||||
@ -1339,7 +1353,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
||||||
remoteKey):
|
remotePeer):
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatal("did not process remote announcement")
|
t.Fatal("did not process remote announcement")
|
||||||
}
|
}
|
||||||
@ -1472,7 +1486,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
|
|||||||
// broadcast should continue as normal.
|
// broadcast should continue as normal.
|
||||||
select {
|
select {
|
||||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
||||||
remoteKey):
|
remotePeer):
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatal("did not process remote announcement")
|
t.Fatal("did not process remote announcement")
|
||||||
}
|
}
|
||||||
@ -1529,6 +1543,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to parse pubkey: %v", err)
|
t.Fatalf("unable to parse pubkey: %v", err)
|
||||||
}
|
}
|
||||||
|
remotePeer := &mockPeer{remoteKey, nil, nil}
|
||||||
|
|
||||||
// Recreate lightning network topology. Initialize router with channel
|
// Recreate lightning network topology. Initialize router with channel
|
||||||
// between two nodes.
|
// between two nodes.
|
||||||
@ -1564,7 +1579,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
||||||
remoteKey):
|
remotePeer):
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatal("did not process remote announcement")
|
t.Fatal("did not process remote announcement")
|
||||||
}
|
}
|
||||||
@ -1579,6 +1594,8 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
|
|||||||
// Set up a channel we can use to inspect messages sent by the
|
// Set up a channel we can use to inspect messages sent by the
|
||||||
// gossiper to the remote peer.
|
// gossiper to the remote peer.
|
||||||
sentToPeer := make(chan lnwire.Message, 1)
|
sentToPeer := make(chan lnwire.Message, 1)
|
||||||
|
remotePeer.sentMsgs = sentToPeer
|
||||||
|
remotePeer.quit = ctx.gossiper.quit
|
||||||
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey,
|
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey,
|
||||||
msg ...lnwire.Message) error {
|
msg ...lnwire.Message) error {
|
||||||
select {
|
select {
|
||||||
@ -1609,7 +1626,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
||||||
remoteKey):
|
remotePeer):
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatal("did not process local announcement")
|
t.Fatal("did not process local announcement")
|
||||||
}
|
}
|
||||||
@ -1654,7 +1671,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
|
|||||||
// trigger a send of the full ChannelAnnouncement.
|
// trigger a send of the full ChannelAnnouncement.
|
||||||
select {
|
select {
|
||||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
||||||
remoteKey):
|
remotePeer):
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatal("did not process local announcement")
|
t.Fatal("did not process local announcement")
|
||||||
}
|
}
|
||||||
@ -1702,7 +1719,13 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("can't create remote channel announcement: %v", err)
|
t.Fatalf("can't create remote channel announcement: %v", err)
|
||||||
}
|
}
|
||||||
announcements.AddMsgs(networkMsg{msg: ca, peer: bitcoinKeyPub2})
|
|
||||||
|
nodePeer := &mockPeer{bitcoinKeyPub2, nil, nil}
|
||||||
|
announcements.AddMsgs(networkMsg{
|
||||||
|
msg: ca,
|
||||||
|
peer: nodePeer,
|
||||||
|
source: nodePeer.IdentityKey(),
|
||||||
|
})
|
||||||
if len(announcements.channelAnnouncements) != 1 {
|
if len(announcements.channelAnnouncements) != 1 {
|
||||||
t.Fatal("new channel announcement not stored in batch")
|
t.Fatal("new channel announcement not stored in batch")
|
||||||
}
|
}
|
||||||
@ -1715,7 +1738,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("can't create remote channel announcement: %v", err)
|
t.Fatalf("can't create remote channel announcement: %v", err)
|
||||||
}
|
}
|
||||||
announcements.AddMsgs(networkMsg{msg: ca2, peer: bitcoinKeyPub2})
|
announcements.AddMsgs(networkMsg{
|
||||||
|
msg: ca2,
|
||||||
|
peer: nodePeer,
|
||||||
|
source: nodePeer.IdentityKey(),
|
||||||
|
})
|
||||||
if len(announcements.channelAnnouncements) != 1 {
|
if len(announcements.channelAnnouncements) != 1 {
|
||||||
t.Fatal("channel announcement not replaced in batch")
|
t.Fatal("channel announcement not replaced in batch")
|
||||||
}
|
}
|
||||||
@ -1727,7 +1754,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("can't create update announcement: %v", err)
|
t.Fatalf("can't create update announcement: %v", err)
|
||||||
}
|
}
|
||||||
announcements.AddMsgs(networkMsg{msg: ua, peer: bitcoinKeyPub2})
|
announcements.AddMsgs(networkMsg{
|
||||||
|
msg: ua,
|
||||||
|
peer: nodePeer,
|
||||||
|
source: nodePeer.IdentityKey(),
|
||||||
|
})
|
||||||
if len(announcements.channelUpdates) != 1 {
|
if len(announcements.channelUpdates) != 1 {
|
||||||
t.Fatal("new channel update not stored in batch")
|
t.Fatal("new channel update not stored in batch")
|
||||||
}
|
}
|
||||||
@ -1738,7 +1769,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("can't create update announcement: %v", err)
|
t.Fatalf("can't create update announcement: %v", err)
|
||||||
}
|
}
|
||||||
announcements.AddMsgs(networkMsg{msg: ua2, peer: bitcoinKeyPub2})
|
announcements.AddMsgs(networkMsg{
|
||||||
|
msg: ua2,
|
||||||
|
peer: nodePeer,
|
||||||
|
source: nodePeer.IdentityKey(),
|
||||||
|
})
|
||||||
if len(announcements.channelUpdates) != 1 {
|
if len(announcements.channelUpdates) != 1 {
|
||||||
t.Fatal("channel update not replaced in batch")
|
t.Fatal("channel update not replaced in batch")
|
||||||
}
|
}
|
||||||
@ -1749,7 +1784,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("can't create update announcement: %v", err)
|
t.Fatalf("can't create update announcement: %v", err)
|
||||||
}
|
}
|
||||||
announcements.AddMsgs(networkMsg{msg: ua3, peer: bitcoinKeyPub2})
|
announcements.AddMsgs(networkMsg{
|
||||||
|
msg: ua3,
|
||||||
|
peer: nodePeer,
|
||||||
|
source: nodePeer.IdentityKey(),
|
||||||
|
})
|
||||||
if len(announcements.channelUpdates) != 1 {
|
if len(announcements.channelUpdates) != 1 {
|
||||||
t.Fatal("channel update not replaced in batch")
|
t.Fatal("channel update not replaced in batch")
|
||||||
}
|
}
|
||||||
@ -1779,7 +1818,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("can't create update announcement: %v", err)
|
t.Fatalf("can't create update announcement: %v", err)
|
||||||
}
|
}
|
||||||
announcements.AddMsgs(networkMsg{msg: ua4, peer: bitcoinKeyPub2})
|
announcements.AddMsgs(networkMsg{
|
||||||
|
msg: ua4,
|
||||||
|
peer: nodePeer,
|
||||||
|
source: nodePeer.IdentityKey(),
|
||||||
|
})
|
||||||
if len(announcements.channelUpdates) != 1 {
|
if len(announcements.channelUpdates) != 1 {
|
||||||
t.Fatal("channel update not in batch")
|
t.Fatal("channel update not in batch")
|
||||||
}
|
}
|
||||||
@ -1791,7 +1834,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("can't create node announcement: %v", err)
|
t.Fatalf("can't create node announcement: %v", err)
|
||||||
}
|
}
|
||||||
announcements.AddMsgs(networkMsg{msg: na, peer: bitcoinKeyPub2})
|
announcements.AddMsgs(networkMsg{
|
||||||
|
msg: na,
|
||||||
|
peer: nodePeer,
|
||||||
|
source: nodePeer.IdentityKey(),
|
||||||
|
})
|
||||||
if len(announcements.nodeAnnouncements) != 1 {
|
if len(announcements.nodeAnnouncements) != 1 {
|
||||||
t.Fatal("new node announcement not stored in batch")
|
t.Fatal("new node announcement not stored in batch")
|
||||||
}
|
}
|
||||||
@ -1801,7 +1848,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("can't create node announcement: %v", err)
|
t.Fatalf("can't create node announcement: %v", err)
|
||||||
}
|
}
|
||||||
announcements.AddMsgs(networkMsg{msg: na2, peer: bitcoinKeyPub2})
|
announcements.AddMsgs(networkMsg{
|
||||||
|
msg: na2,
|
||||||
|
peer: nodePeer,
|
||||||
|
source: nodePeer.IdentityKey(),
|
||||||
|
})
|
||||||
if len(announcements.nodeAnnouncements) != 2 {
|
if len(announcements.nodeAnnouncements) != 2 {
|
||||||
t.Fatal("second node announcement not stored in batch")
|
t.Fatal("second node announcement not stored in batch")
|
||||||
}
|
}
|
||||||
@ -1812,7 +1863,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("can't create node announcement: %v", err)
|
t.Fatalf("can't create node announcement: %v", err)
|
||||||
}
|
}
|
||||||
announcements.AddMsgs(networkMsg{msg: na3, peer: bitcoinKeyPub2})
|
announcements.AddMsgs(networkMsg{
|
||||||
|
msg: na3,
|
||||||
|
peer: nodePeer,
|
||||||
|
source: nodePeer.IdentityKey(),
|
||||||
|
})
|
||||||
if len(announcements.nodeAnnouncements) != 2 {
|
if len(announcements.nodeAnnouncements) != 2 {
|
||||||
t.Fatal("second node announcement not replaced in batch")
|
t.Fatal("second node announcement not replaced in batch")
|
||||||
}
|
}
|
||||||
@ -1824,7 +1879,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("can't create node announcement: %v", err)
|
t.Fatalf("can't create node announcement: %v", err)
|
||||||
}
|
}
|
||||||
announcements.AddMsgs(networkMsg{msg: na4, peer: bitcoinKeyPub2})
|
announcements.AddMsgs(networkMsg{
|
||||||
|
msg: na4,
|
||||||
|
peer: nodePeer,
|
||||||
|
source: nodePeer.IdentityKey(),
|
||||||
|
})
|
||||||
if len(announcements.nodeAnnouncements) != 2 {
|
if len(announcements.nodeAnnouncements) != 2 {
|
||||||
t.Fatal("second node announcement not replaced again in batch")
|
t.Fatal("second node announcement not replaced again in batch")
|
||||||
}
|
}
|
||||||
@ -1835,7 +1894,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("can't create node announcement: %v", err)
|
t.Fatalf("can't create node announcement: %v", err)
|
||||||
}
|
}
|
||||||
announcements.AddMsgs(networkMsg{msg: na5, peer: bitcoinKeyPub2})
|
announcements.AddMsgs(networkMsg{
|
||||||
|
msg: na5,
|
||||||
|
peer: nodePeer,
|
||||||
|
source: nodePeer.IdentityKey(),
|
||||||
|
})
|
||||||
if len(announcements.nodeAnnouncements) != 2 {
|
if len(announcements.nodeAnnouncements) != 2 {
|
||||||
t.Fatal("node announcement not replaced in batch")
|
t.Fatal("node announcement not replaced in batch")
|
||||||
}
|
}
|
||||||
@ -1910,6 +1973,9 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
|
|||||||
// Set up a channel that we can use to inspect the messages
|
// Set up a channel that we can use to inspect the messages
|
||||||
// sent directly fromn the gossiper.
|
// sent directly fromn the gossiper.
|
||||||
sentMsgs := make(chan lnwire.Message, 10)
|
sentMsgs := make(chan lnwire.Message, 10)
|
||||||
|
ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) {
|
||||||
|
return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil
|
||||||
|
}
|
||||||
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error {
|
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error {
|
||||||
select {
|
select {
|
||||||
case sentMsgs <- msg[0]:
|
case sentMsgs <- msg[0]:
|
||||||
@ -1932,11 +1998,12 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to parse pubkey: %v", err)
|
t.Fatalf("unable to parse pubkey: %v", err)
|
||||||
}
|
}
|
||||||
|
remotePeer := &mockPeer{remoteKey, nil, nil}
|
||||||
|
|
||||||
// Recreate the case where the remote node is sending us its ChannelUpdate
|
// Recreate the case where the remote node is sending us its ChannelUpdate
|
||||||
// before we have been able to process our own ChannelAnnouncement and
|
// before we have been able to process our own ChannelAnnouncement and
|
||||||
// ChannelUpdate.
|
// ChannelUpdate.
|
||||||
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remoteKey)
|
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remotePeer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to process :%v", err)
|
t.Fatalf("unable to process :%v", err)
|
||||||
}
|
}
|
||||||
@ -2042,7 +2109,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
|
|||||||
t.Fatal("wrong number of objects in storage")
|
t.Fatal("wrong number of objects in storage")
|
||||||
}
|
}
|
||||||
|
|
||||||
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey)
|
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remotePeer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to process :%v", err)
|
t.Fatalf("unable to process :%v", err)
|
||||||
}
|
}
|
||||||
@ -2069,3 +2136,33 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
|
|||||||
t.Fatal("waiting proof should be removed from storage")
|
t.Fatal("waiting proof should be removed from storage")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// mockPeer implements the lnpeer.Peer interface and is used to test the
|
||||||
|
// gossiper's interaction with peers.
|
||||||
|
type mockPeer struct {
|
||||||
|
pk *btcec.PublicKey
|
||||||
|
sentMsgs chan lnwire.Message
|
||||||
|
quit chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error {
|
||||||
|
if p.sentMsgs == nil && p.quit == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, msg := range msgs {
|
||||||
|
select {
|
||||||
|
case p.sentMsgs <- msg:
|
||||||
|
case <-p.quit:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil }
|
||||||
|
func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk }
|
||||||
|
func (p *mockPeer) PubKey() [33]byte {
|
||||||
|
var pubkey [33]byte
|
||||||
|
copy(pubkey[:], p.pk.SerializeCompressed())
|
||||||
|
return pubkey
|
||||||
|
}
|
||||||
|
@ -2,9 +2,9 @@ package htlcswitch
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
|
"github.com/lightningnetwork/lnd/lnpeer"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/roasbeef/btcd/chaincfg/chainhash"
|
"github.com/roasbeef/btcd/chaincfg/chainhash"
|
||||||
"github.com/roasbeef/btcd/wire"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// InvoiceDatabase is an interface which represents the persistent subsystem
|
// InvoiceDatabase is an interface which represents the persistent subsystem
|
||||||
@ -97,7 +97,7 @@ type ChannelLink interface {
|
|||||||
|
|
||||||
// Peer returns the representation of remote peer with which we have
|
// Peer returns the representation of remote peer with which we have
|
||||||
// the channel link opened.
|
// the channel link opened.
|
||||||
Peer() Peer
|
Peer() lnpeer.Peer
|
||||||
|
|
||||||
// EligibleToForward returns a bool indicating if the channel is able
|
// EligibleToForward returns a bool indicating if the channel is able
|
||||||
// to actively accept requests to forward HTLC's. A channel may be
|
// to actively accept requests to forward HTLC's. A channel may be
|
||||||
@ -116,22 +116,6 @@ type ChannelLink interface {
|
|||||||
Stop()
|
Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Peer is an interface which represents the remote lightning node inside our
|
|
||||||
// system.
|
|
||||||
type Peer interface {
|
|
||||||
// SendMessage sends message to remote peer. The second argument
|
|
||||||
// denotes if the method should block until the message has been sent
|
|
||||||
// to the remote peer.
|
|
||||||
SendMessage(msg lnwire.Message, sync bool) error
|
|
||||||
|
|
||||||
// WipeChannel removes the channel uniquely identified by its channel
|
|
||||||
// point from all indexes associated with the peer.
|
|
||||||
WipeChannel(*wire.OutPoint) error
|
|
||||||
|
|
||||||
// PubKey returns the serialize public key of the source peer.
|
|
||||||
PubKey() [33]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// ForwardingLog is an interface that represents a time series database which
|
// ForwardingLog is an interface that represents a time series database which
|
||||||
// keep track of all successfully completed payment circuits. Every few
|
// keep track of all successfully completed payment circuits. Every few
|
||||||
// seconds, the switch will collate and flush out all the successful payment
|
// seconds, the switch will collate and flush out all the successful payment
|
||||||
|
@ -15,6 +15,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
"github.com/lightningnetwork/lnd/contractcourt"
|
"github.com/lightningnetwork/lnd/contractcourt"
|
||||||
"github.com/lightningnetwork/lnd/htlcswitch/hodl"
|
"github.com/lightningnetwork/lnd/htlcswitch/hodl"
|
||||||
|
"github.com/lightningnetwork/lnd/lnpeer"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/roasbeef/btcd/chaincfg/chainhash"
|
"github.com/roasbeef/btcd/chaincfg/chainhash"
|
||||||
@ -163,7 +164,7 @@ type ChannelLinkConfig struct {
|
|||||||
|
|
||||||
// Peer is a lightning network node with which we have the channel link
|
// Peer is a lightning network node with which we have the channel link
|
||||||
// opened.
|
// opened.
|
||||||
Peer Peer
|
Peer lnpeer.Peer
|
||||||
|
|
||||||
// Registry is a sub-system which responsible for managing the invoices
|
// Registry is a sub-system which responsible for managing the invoices
|
||||||
// in thread-safe manner.
|
// in thread-safe manner.
|
||||||
@ -534,7 +535,7 @@ func (l *channelLink) syncChanStates() error {
|
|||||||
return fmt.Errorf("unable to generate chan sync message for "+
|
return fmt.Errorf("unable to generate chan sync message for "+
|
||||||
"ChannelPoint(%v)", l.channel.ChannelPoint())
|
"ChannelPoint(%v)", l.channel.ChannelPoint())
|
||||||
}
|
}
|
||||||
if err := l.cfg.Peer.SendMessage(localChanSyncMsg, false); err != nil {
|
if err := l.cfg.Peer.SendMessage(false, localChanSyncMsg); err != nil {
|
||||||
return fmt.Errorf("Unable to send chan sync message for "+
|
return fmt.Errorf("Unable to send chan sync message for "+
|
||||||
"ChannelPoint(%v)", l.channel.ChannelPoint())
|
"ChannelPoint(%v)", l.channel.ChannelPoint())
|
||||||
}
|
}
|
||||||
@ -576,7 +577,7 @@ func (l *channelLink) syncChanStates() error {
|
|||||||
fundingLockedMsg := lnwire.NewFundingLocked(
|
fundingLockedMsg := lnwire.NewFundingLocked(
|
||||||
l.ChanID(), nextRevocation,
|
l.ChanID(), nextRevocation,
|
||||||
)
|
)
|
||||||
err = l.cfg.Peer.SendMessage(fundingLockedMsg, false)
|
err = l.cfg.Peer.SendMessage(false, fundingLockedMsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to re-send "+
|
return fmt.Errorf("unable to re-send "+
|
||||||
"FundingLocked: %v", err)
|
"FundingLocked: %v", err)
|
||||||
@ -626,7 +627,7 @@ func (l *channelLink) syncChanStates() error {
|
|||||||
// immediately so we return to a synchronized state as soon as
|
// immediately so we return to a synchronized state as soon as
|
||||||
// possible.
|
// possible.
|
||||||
for _, msg := range msgsToReSend {
|
for _, msg := range msgsToReSend {
|
||||||
l.cfg.Peer.SendMessage(msg, false)
|
l.cfg.Peer.SendMessage(false, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-l.quit:
|
case <-l.quit:
|
||||||
@ -1107,7 +1108,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
|
|||||||
l.openedCircuits = append(l.openedCircuits, pkt.inKey())
|
l.openedCircuits = append(l.openedCircuits, pkt.inKey())
|
||||||
l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
|
l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
|
||||||
|
|
||||||
l.cfg.Peer.SendMessage(htlc, false)
|
l.cfg.Peer.SendMessage(false, htlc)
|
||||||
|
|
||||||
case *lnwire.UpdateFulfillHTLC:
|
case *lnwire.UpdateFulfillHTLC:
|
||||||
// If hodl.SettleOutgoing mode is active, we exit early to
|
// If hodl.SettleOutgoing mode is active, we exit early to
|
||||||
@ -1148,7 +1149,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
|
|||||||
|
|
||||||
// Then we send the HTLC settle message to the connected peer
|
// Then we send the HTLC settle message to the connected peer
|
||||||
// so we can continue the propagation of the settle message.
|
// so we can continue the propagation of the settle message.
|
||||||
l.cfg.Peer.SendMessage(htlc, false)
|
l.cfg.Peer.SendMessage(false, htlc)
|
||||||
isSettle = true
|
isSettle = true
|
||||||
|
|
||||||
case *lnwire.UpdateFailHTLC:
|
case *lnwire.UpdateFailHTLC:
|
||||||
@ -1189,7 +1190,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
|
|||||||
|
|
||||||
// Finally, we send the HTLC message to the peer which
|
// Finally, we send the HTLC message to the peer which
|
||||||
// initially created the HTLC.
|
// initially created the HTLC.
|
||||||
l.cfg.Peer.SendMessage(htlc, false)
|
l.cfg.Peer.SendMessage(false, htlc)
|
||||||
isSettle = true
|
isSettle = true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1342,7 +1343,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
|||||||
log.Errorf("unable to revoke commitment: %v", err)
|
log.Errorf("unable to revoke commitment: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
l.cfg.Peer.SendMessage(nextRevocation, false)
|
l.cfg.Peer.SendMessage(false, nextRevocation)
|
||||||
|
|
||||||
// Since we just revoked our commitment, we may have a new set
|
// Since we just revoked our commitment, we may have a new set
|
||||||
// of HTLC's on our commitment, so we'll send them over our
|
// of HTLC's on our commitment, so we'll send them over our
|
||||||
@ -1561,7 +1562,7 @@ func (l *channelLink) updateCommitTx() error {
|
|||||||
CommitSig: theirCommitSig,
|
CommitSig: theirCommitSig,
|
||||||
HtlcSigs: htlcSigs,
|
HtlcSigs: htlcSigs,
|
||||||
}
|
}
|
||||||
l.cfg.Peer.SendMessage(commitSig, false)
|
l.cfg.Peer.SendMessage(false, commitSig)
|
||||||
|
|
||||||
// We've just initiated a state transition, attempt to stop the
|
// We've just initiated a state transition, attempt to stop the
|
||||||
// logCommitTimer. If the timer already ticked, then we'll consume the
|
// logCommitTimer. If the timer already ticked, then we'll consume the
|
||||||
@ -1585,7 +1586,7 @@ func (l *channelLink) updateCommitTx() error {
|
|||||||
// channel link opened.
|
// channel link opened.
|
||||||
//
|
//
|
||||||
// NOTE: Part of the ChannelLink interface.
|
// NOTE: Part of the ChannelLink interface.
|
||||||
func (l *channelLink) Peer() Peer {
|
func (l *channelLink) Peer() lnpeer.Peer {
|
||||||
return l.cfg.Peer
|
return l.cfg.Peer
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1852,7 +1853,7 @@ func (l *channelLink) updateChannelFee(feePerKw lnwallet.SatPerKWeight) error {
|
|||||||
// We'll then attempt to send a new UpdateFee message, and also lock it
|
// We'll then attempt to send a new UpdateFee message, and also lock it
|
||||||
// in immediately by triggering a commitment update.
|
// in immediately by triggering a commitment update.
|
||||||
msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
|
msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
|
||||||
if err := l.cfg.Peer.SendMessage(msg, false); err != nil {
|
if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return l.updateCommitTx()
|
return l.updateCommitTx()
|
||||||
@ -2260,11 +2261,11 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
|
|||||||
|
|
||||||
// HTLC was successfully settled locally send
|
// HTLC was successfully settled locally send
|
||||||
// notification about it remote peer.
|
// notification about it remote peer.
|
||||||
l.cfg.Peer.SendMessage(&lnwire.UpdateFulfillHTLC{
|
l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
|
||||||
ChanID: l.ChanID(),
|
ChanID: l.ChanID(),
|
||||||
ID: pd.HtlcIndex,
|
ID: pd.HtlcIndex,
|
||||||
PaymentPreimage: preimage,
|
PaymentPreimage: preimage,
|
||||||
}, false)
|
})
|
||||||
needUpdate = true
|
needUpdate = true
|
||||||
|
|
||||||
// There are additional channels left within this route. So
|
// There are additional channels left within this route. So
|
||||||
@ -2550,11 +2551,11 @@ func (l *channelLink) sendHTLCError(htlcIndex uint64, failure lnwire.FailureMess
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
l.cfg.Peer.SendMessage(&lnwire.UpdateFailHTLC{
|
l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailHTLC{
|
||||||
ChanID: l.ChanID(),
|
ChanID: l.ChanID(),
|
||||||
ID: htlcIndex,
|
ID: htlcIndex,
|
||||||
Reason: reason,
|
Reason: reason,
|
||||||
}, false)
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendMalformedHTLCError helper function which sends the malformed HTLC update
|
// sendMalformedHTLCError helper function which sends the malformed HTLC update
|
||||||
@ -2569,12 +2570,12 @@ func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
l.cfg.Peer.SendMessage(&lnwire.UpdateFailMalformedHTLC{
|
l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
|
||||||
ChanID: l.ChanID(),
|
ChanID: l.ChanID(),
|
||||||
ID: htlcIndex,
|
ID: htlcIndex,
|
||||||
ShaOnionBlob: shaOnionBlob,
|
ShaOnionBlob: shaOnionBlob,
|
||||||
FailureCode: code,
|
FailureCode: code,
|
||||||
}, false)
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// fail is a function which is used to encapsulate the action necessary for
|
// fail is a function which is used to encapsulate the action necessary for
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
"github.com/lightningnetwork/lnd/contractcourt"
|
"github.com/lightningnetwork/lnd/contractcourt"
|
||||||
"github.com/lightningnetwork/lnd/htlcswitch/hodl"
|
"github.com/lightningnetwork/lnd/htlcswitch/hodl"
|
||||||
|
"github.com/lightningnetwork/lnd/lnpeer"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/roasbeef/btcd/btcec"
|
"github.com/roasbeef/btcd/btcec"
|
||||||
@ -1396,14 +1397,14 @@ type mockPeer struct {
|
|||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Peer = (*mockPeer)(nil)
|
var _ lnpeer.Peer = (*mockPeer)(nil)
|
||||||
|
|
||||||
func (m *mockPeer) SendMessage(msg lnwire.Message, sync bool) error {
|
func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error {
|
||||||
if m.disconnected {
|
if m.disconnected {
|
||||||
return fmt.Errorf("disconnected")
|
return fmt.Errorf("disconnected")
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case m.sentMsgs <- msg:
|
case m.sentMsgs <- msgs[0]:
|
||||||
case <-m.quit:
|
case <-m.quit:
|
||||||
return fmt.Errorf("mockPeer shutting down")
|
return fmt.Errorf("mockPeer shutting down")
|
||||||
}
|
}
|
||||||
@ -1415,8 +1416,11 @@ func (m *mockPeer) WipeChannel(*wire.OutPoint) error {
|
|||||||
func (m *mockPeer) PubKey() [33]byte {
|
func (m *mockPeer) PubKey() [33]byte {
|
||||||
return [33]byte{}
|
return [33]byte{}
|
||||||
}
|
}
|
||||||
|
func (m *mockPeer) IdentityKey() *btcec.PublicKey {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
var _ Peer = (*mockPeer)(nil)
|
var _ lnpeer.Peer = (*mockPeer)(nil)
|
||||||
|
|
||||||
func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
|
func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
|
||||||
ChannelLink, *lnwallet.LightningChannel, chan time.Time, func() error,
|
ChannelLink, *lnwallet.LightningChannel, chan time.Time, func() error,
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
"github.com/lightningnetwork/lnd/contractcourt"
|
"github.com/lightningnetwork/lnd/contractcourt"
|
||||||
|
"github.com/lightningnetwork/lnd/lnpeer"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/roasbeef/btcd/btcec"
|
"github.com/roasbeef/btcd/btcec"
|
||||||
@ -119,7 +120,7 @@ type mockServer struct {
|
|||||||
interceptorFuncs []messageInterceptor
|
interceptorFuncs []messageInterceptor
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Peer = (*mockServer)(nil)
|
var _ lnpeer.Peer = (*mockServer)(nil)
|
||||||
|
|
||||||
func initSwitchWithDB(db *channeldb.DB) (*Switch, error) {
|
func initSwitchWithDB(db *channeldb.DB) (*Switch, error) {
|
||||||
if db == nil {
|
if db == nil {
|
||||||
@ -450,12 +451,14 @@ func (s *mockServer) intersect(f messageInterceptor) {
|
|||||||
s.interceptorFuncs = append(s.interceptorFuncs, f)
|
s.interceptorFuncs = append(s.interceptorFuncs, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *mockServer) SendMessage(message lnwire.Message, sync bool) error {
|
func (s *mockServer) SendMessage(sync bool, msgs ...lnwire.Message) error {
|
||||||
|
|
||||||
select {
|
for _, msg := range msgs {
|
||||||
case s.messages <- message:
|
select {
|
||||||
case <-s.quit:
|
case s.messages <- msg:
|
||||||
return errors.New("server is stopped")
|
case <-s.quit:
|
||||||
|
return errors.New("server is stopped")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -506,6 +509,11 @@ func (s *mockServer) PubKey() [33]byte {
|
|||||||
return s.id
|
return s.id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *mockServer) IdentityKey() *btcec.PublicKey {
|
||||||
|
pubkey, _ := btcec.ParsePubKey(s.id[:], btcec.S256())
|
||||||
|
return pubkey
|
||||||
|
}
|
||||||
|
|
||||||
func (s *mockServer) WipeChannel(*wire.OutPoint) error {
|
func (s *mockServer) WipeChannel(*wire.OutPoint) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -532,7 +540,7 @@ type mockChannelLink struct {
|
|||||||
|
|
||||||
chanID lnwire.ChannelID
|
chanID lnwire.ChannelID
|
||||||
|
|
||||||
peer Peer
|
peer lnpeer.Peer
|
||||||
|
|
||||||
startMailBox bool
|
startMailBox bool
|
||||||
|
|
||||||
@ -579,7 +587,7 @@ func (f *mockChannelLink) deleteCircuit(pkt *htlcPacket) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newMockChannelLink(htlcSwitch *Switch, chanID lnwire.ChannelID,
|
func newMockChannelLink(htlcSwitch *Switch, chanID lnwire.ChannelID,
|
||||||
shortChanID lnwire.ShortChannelID, peer Peer, eligible bool,
|
shortChanID lnwire.ShortChannelID, peer lnpeer.Peer, eligible bool,
|
||||||
) *mockChannelLink {
|
) *mockChannelLink {
|
||||||
|
|
||||||
return &mockChannelLink{
|
return &mockChannelLink{
|
||||||
@ -624,7 +632,7 @@ func (f *mockChannelLink) Start() error {
|
|||||||
func (f *mockChannelLink) ChanID() lnwire.ChannelID { return f.chanID }
|
func (f *mockChannelLink) ChanID() lnwire.ChannelID { return f.chanID }
|
||||||
func (f *mockChannelLink) ShortChanID() lnwire.ShortChannelID { return f.shortChanID }
|
func (f *mockChannelLink) ShortChanID() lnwire.ShortChannelID { return f.shortChanID }
|
||||||
func (f *mockChannelLink) Bandwidth() lnwire.MilliSatoshi { return 99999999 }
|
func (f *mockChannelLink) Bandwidth() lnwire.MilliSatoshi { return 99999999 }
|
||||||
func (f *mockChannelLink) Peer() Peer { return f.peer }
|
func (f *mockChannelLink) Peer() lnpeer.Peer { return f.peer }
|
||||||
func (f *mockChannelLink) Stop() {}
|
func (f *mockChannelLink) Stop() {}
|
||||||
func (f *mockChannelLink) EligibleToForward() bool { return f.eligible }
|
func (f *mockChannelLink) EligibleToForward() bool { return f.eligible }
|
||||||
func (f *mockChannelLink) setLiveShortChanID(sid lnwire.ShortChannelID) { f.shortChanID = sid }
|
func (f *mockChannelLink) setLiveShortChanID(sid lnwire.ShortChannelID) { f.shortChanID = sid }
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
"github.com/lightningnetwork/lnd/contractcourt"
|
"github.com/lightningnetwork/lnd/contractcourt"
|
||||||
"github.com/lightningnetwork/lnd/keychain"
|
"github.com/lightningnetwork/lnd/keychain"
|
||||||
|
"github.com/lightningnetwork/lnd/lnpeer"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/lightningnetwork/lnd/shachain"
|
"github.com/lightningnetwork/lnd/shachain"
|
||||||
@ -672,7 +673,7 @@ func (r *paymentResponse) Wait(d time.Duration) (chainhash.Hash, error) {
|
|||||||
// * from Alice to Bob
|
// * from Alice to Bob
|
||||||
// * from Alice to Carol through the Bob
|
// * from Alice to Carol through the Bob
|
||||||
// * from Alice to some another peer through the Bob
|
// * from Alice to some another peer through the Bob
|
||||||
func (n *threeHopNetwork) makePayment(sendingPeer, receivingPeer Peer,
|
func (n *threeHopNetwork) makePayment(sendingPeer, receivingPeer lnpeer.Peer,
|
||||||
firstHopPub [33]byte, hops []ForwardingInfo,
|
firstHopPub [33]byte, hops []ForwardingInfo,
|
||||||
invoiceAmt, htlcAmt lnwire.MilliSatoshi,
|
invoiceAmt, htlcAmt lnwire.MilliSatoshi,
|
||||||
timelock uint32) *paymentResponse {
|
timelock uint32) *paymentResponse {
|
||||||
|
26
lnpeer/peer.go
Normal file
26
lnpeer/peer.go
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
package lnpeer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/roasbeef/btcd/btcec"
|
||||||
|
"github.com/roasbeef/btcd/wire"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Peer is an interface which represents the remote lightning node inside our
|
||||||
|
// system.
|
||||||
|
type Peer interface {
|
||||||
|
// SendMessage sends a variadic number of message to remote peer. The
|
||||||
|
// first argument denotes if the method should block until the message
|
||||||
|
// has been sent to the remote peer.
|
||||||
|
SendMessage(sync bool, msg ...lnwire.Message) error
|
||||||
|
|
||||||
|
// WipeChannel removes the channel uniquely identified by its channel
|
||||||
|
// point from all indexes associated with the peer.
|
||||||
|
WipeChannel(*wire.OutPoint) error
|
||||||
|
|
||||||
|
// PubKey returns the serialized public key of the remote peer.
|
||||||
|
PubKey() [33]byte
|
||||||
|
|
||||||
|
// IdentityKey returns the public key of the remote peer.
|
||||||
|
IdentityKey() *btcec.PublicKey
|
||||||
|
}
|
57
peer.go
57
peer.go
@ -21,6 +21,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/lnrpc"
|
"github.com/lightningnetwork/lnd/lnrpc"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/roasbeef/btcd/btcec"
|
||||||
"github.com/roasbeef/btcd/chaincfg/chainhash"
|
"github.com/roasbeef/btcd/chaincfg/chainhash"
|
||||||
"github.com/roasbeef/btcd/connmgr"
|
"github.com/roasbeef/btcd/connmgr"
|
||||||
"github.com/roasbeef/btcd/txscript"
|
"github.com/roasbeef/btcd/txscript"
|
||||||
@ -92,7 +93,6 @@ type peer struct {
|
|||||||
bytesReceived uint64
|
bytesReceived uint64
|
||||||
bytesSent uint64
|
bytesSent uint64
|
||||||
|
|
||||||
|
|
||||||
// pingTime is a rough estimate of the RTT (round-trip-time) between us
|
// pingTime is a rough estimate of the RTT (round-trip-time) between us
|
||||||
// and the connected peer. This time is expressed in micro seconds.
|
// and the connected peer. This time is expressed in micro seconds.
|
||||||
// To be used atomically.
|
// To be used atomically.
|
||||||
@ -502,10 +502,10 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
|
|||||||
if linkErr.SendData != nil {
|
if linkErr.SendData != nil {
|
||||||
data = linkErr.SendData
|
data = linkErr.SendData
|
||||||
}
|
}
|
||||||
err := p.SendMessage(&lnwire.Error{
|
err := p.SendMessage(true, &lnwire.Error{
|
||||||
ChanID: chanID,
|
ChanID: chanID,
|
||||||
Data: data,
|
Data: data,
|
||||||
}, true)
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peerLog.Errorf("unable to send msg to "+
|
peerLog.Errorf("unable to send msg to "+
|
||||||
"remote peer: %v", err)
|
"remote peer: %v", err)
|
||||||
@ -833,8 +833,7 @@ func newDiscMsgStream(p *peer) *msgStream {
|
|||||||
"Update stream for gossiper exited",
|
"Update stream for gossiper exited",
|
||||||
1000,
|
1000,
|
||||||
func(msg lnwire.Message) {
|
func(msg lnwire.Message) {
|
||||||
p.server.authGossiper.ProcessRemoteAnnouncement(msg,
|
p.server.authGossiper.ProcessRemoteAnnouncement(msg, p)
|
||||||
p.addr.IdentityKey)
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@ -1958,23 +1957,38 @@ func (p *peer) sendInitMsg() error {
|
|||||||
return p.writeMessage(msg)
|
return p.writeMessage(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendMessage sends message to remote peer. The second argument denotes if the
|
// SendMessage sends a variadic number of message to remote peer. The first
|
||||||
// method should block until the message has been sent to the remote peer.
|
// argument denotes if the method should block until the message has been sent
|
||||||
func (p *peer) SendMessage(msg lnwire.Message, sync bool) error {
|
// to the remote peer.
|
||||||
if !sync {
|
func (p *peer) SendMessage(sync bool, msgs ...lnwire.Message) error {
|
||||||
p.queueMsg(msg, nil)
|
// Add all incoming messages to the outgoing queue. A list of error
|
||||||
return nil
|
// chans is populated for each message if the caller requested a sync
|
||||||
|
// send.
|
||||||
|
var errChans []chan error
|
||||||
|
for _, msg := range msgs {
|
||||||
|
// If a sync send was requested, create an error chan to listen
|
||||||
|
// for an ack from the writeHandler.
|
||||||
|
var errChan chan error
|
||||||
|
if sync {
|
||||||
|
errChan = make(chan error, 1)
|
||||||
|
errChans = append(errChans, errChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.queueMsg(msg, errChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
errChan := make(chan error, 1)
|
// Wait for all replies from the writeHandler. For async sends, this
|
||||||
p.queueMsg(msg, errChan)
|
// will be a NOP as the list of error chans is nil.
|
||||||
|
for _, errChan := range errChans {
|
||||||
select {
|
select {
|
||||||
case err := <-errChan:
|
case err := <-errChan:
|
||||||
return err
|
return err
|
||||||
case <-p.quit:
|
case <-p.quit:
|
||||||
return fmt.Errorf("peer shutting down")
|
return ErrPeerExiting
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PubKey returns the pubkey of the peer in compressed serialized format.
|
// PubKey returns the pubkey of the peer in compressed serialized format.
|
||||||
@ -1982,6 +1996,11 @@ func (p *peer) PubKey() [33]byte {
|
|||||||
return p.pubKeyBytes
|
return p.pubKeyBytes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IdentityKey returns the public key of the remote peer.
|
||||||
|
func (p *peer) IdentityKey() *btcec.PublicKey {
|
||||||
|
return p.addr.IdentityKey
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(roasbeef): make all start/stop mutexes a CAS
|
// TODO(roasbeef): make all start/stop mutexes a CAS
|
||||||
|
|
||||||
// fetchLastChanUpdate returns a function which is able to retrieve the last
|
// fetchLastChanUpdate returns a function which is able to retrieve the last
|
||||||
|
20
server.go
20
server.go
@ -24,6 +24,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/contractcourt"
|
"github.com/lightningnetwork/lnd/contractcourt"
|
||||||
"github.com/lightningnetwork/lnd/discovery"
|
"github.com/lightningnetwork/lnd/discovery"
|
||||||
"github.com/lightningnetwork/lnd/htlcswitch"
|
"github.com/lightningnetwork/lnd/htlcswitch"
|
||||||
|
"github.com/lightningnetwork/lnd/lnpeer"
|
||||||
"github.com/lightningnetwork/lnd/lnrpc"
|
"github.com/lightningnetwork/lnd/lnrpc"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
@ -435,12 +436,15 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl,
|
|||||||
}
|
}
|
||||||
|
|
||||||
s.authGossiper, err = discovery.New(discovery.Config{
|
s.authGossiper, err = discovery.New(discovery.Config{
|
||||||
Router: s.chanRouter,
|
Router: s.chanRouter,
|
||||||
Notifier: s.cc.chainNotifier,
|
Notifier: s.cc.chainNotifier,
|
||||||
ChainHash: *activeNetParams.GenesisHash,
|
ChainHash: *activeNetParams.GenesisHash,
|
||||||
Broadcast: s.BroadcastMessage,
|
Broadcast: s.BroadcastMessage,
|
||||||
ChanSeries: &chanSeries{s.chanDB.ChannelGraph()},
|
ChanSeries: &chanSeries{s.chanDB.ChannelGraph()},
|
||||||
SendToPeer: s.SendToPeer,
|
SendToPeer: s.SendToPeer,
|
||||||
|
FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) {
|
||||||
|
return s.FindPeer(pub)
|
||||||
|
},
|
||||||
NotifyWhenOnline: s.NotifyWhenOnline,
|
NotifyWhenOnline: s.NotifyWhenOnline,
|
||||||
ProofMatureDelta: 0,
|
ProofMatureDelta: 0,
|
||||||
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
|
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
|
||||||
@ -1882,7 +1886,7 @@ func (s *server) addPeer(p *peer) {
|
|||||||
// TODO(roasbeef): craft s.t. we only get updates from a few
|
// TODO(roasbeef): craft s.t. we only get updates from a few
|
||||||
// peers
|
// peers
|
||||||
recvUpdates := !cfg.NoChanUpdates
|
recvUpdates := !cfg.NoChanUpdates
|
||||||
go s.authGossiper.InitSyncState(p.addr.IdentityKey, recvUpdates)
|
go s.authGossiper.InitSyncState(p, recvUpdates)
|
||||||
|
|
||||||
// If the remote peer has the initial sync feature bit set, then we'll
|
// If the remote peer has the initial sync feature bit set, then we'll
|
||||||
// being the synchronization protocol to exchange authenticated channel
|
// being the synchronization protocol to exchange authenticated channel
|
||||||
@ -1892,7 +1896,7 @@ func (s *server) addPeer(p *peer) {
|
|||||||
srvrLog.Infof("Requesting full table sync with %x",
|
srvrLog.Infof("Requesting full table sync with %x",
|
||||||
p.pubKeyBytes[:])
|
p.pubKeyBytes[:])
|
||||||
|
|
||||||
go s.authGossiper.SynchronizeNode(p.addr.IdentityKey)
|
go s.authGossiper.SynchronizeNode(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if there are listeners waiting for this peer to come online.
|
// Check if there are listeners waiting for this peer to come online.
|
||||||
|
Loading…
Reference in New Issue
Block a user