diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 96e2ddc2..ec8a0df9 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -166,6 +166,11 @@ func (m *mockChanEvent) NotifyPendingOpenChannelEvent(outpoint wire.OutPoint, } } +type newChannelMsg struct { + channel *channeldb.OpenChannel + err chan error +} + type testNode struct { privKey *btcec.PrivateKey addr *lnwire.NetAddress diff --git a/lnwire/commit_sig.go b/lnwire/commit_sig.go index 72c235b3..2455c016 100644 --- a/lnwire/commit_sig.go +++ b/lnwire/commit_sig.go @@ -89,7 +89,7 @@ func (c *CommitSig) MaxPayloadLength(uint32) uint32 { // TargetChanID returns the channel id of the link for which this message is // intended. // -// NOTE: Part of lnd.LinkUpdater interface. +// NOTE: Part of peer.LinkUpdater interface. func (c *CommitSig) TargetChanID() ChannelID { return c.ChanID } diff --git a/lnwire/revoke_and_ack.go b/lnwire/revoke_and_ack.go index f6395108..d685f0f3 100644 --- a/lnwire/revoke_and_ack.go +++ b/lnwire/revoke_and_ack.go @@ -85,7 +85,7 @@ func (c *RevokeAndAck) MaxPayloadLength(uint32) uint32 { // TargetChanID returns the channel id of the link for which this message is // intended. // -// NOTE: Part of lnd.LinkUpdater interface. +// NOTE: Part of peer.LinkUpdater interface. func (c *RevokeAndAck) TargetChanID() ChannelID { return c.ChanID } diff --git a/lnwire/update_add_htlc.go b/lnwire/update_add_htlc.go index b3add950..028c6320 100644 --- a/lnwire/update_add_htlc.go +++ b/lnwire/update_add_htlc.go @@ -113,7 +113,7 @@ func (c *UpdateAddHTLC) MaxPayloadLength(uint32) uint32 { // TargetChanID returns the channel id of the link for which this message is // intended. // -// NOTE: Part of lnd.LinkUpdater interface. +// NOTE: Part of peer.LinkUpdater interface. func (c *UpdateAddHTLC) TargetChanID() ChannelID { return c.ChanID } diff --git a/lnwire/update_fail_htlc.go b/lnwire/update_fail_htlc.go index 17fc3cd4..194f2ecd 100644 --- a/lnwire/update_fail_htlc.go +++ b/lnwire/update_fail_htlc.go @@ -89,7 +89,7 @@ func (c *UpdateFailHTLC) MaxPayloadLength(uint32) uint32 { // TargetChanID returns the channel id of the link for which this message is // intended. // -// NOTE: Part of lnd.LinkUpdater interface. +// NOTE: Part of peer.LinkUpdater interface. func (c *UpdateFailHTLC) TargetChanID() ChannelID { return c.ChanID } diff --git a/lnwire/update_fail_malformed_htlc.go b/lnwire/update_fail_malformed_htlc.go index 68f0a61b..39d4b870 100644 --- a/lnwire/update_fail_malformed_htlc.go +++ b/lnwire/update_fail_malformed_htlc.go @@ -77,7 +77,7 @@ func (c *UpdateFailMalformedHTLC) MaxPayloadLength(uint32) uint32 { // TargetChanID returns the channel id of the link for which this message is // intended. // -// NOTE: Part of lnd.LinkUpdater interface. +// NOTE: Part of peer.LinkUpdater interface. func (c *UpdateFailMalformedHTLC) TargetChanID() ChannelID { return c.ChanID } diff --git a/lnwire/update_fee.go b/lnwire/update_fee.go index 5657633b..2d27c377 100644 --- a/lnwire/update_fee.go +++ b/lnwire/update_fee.go @@ -72,7 +72,7 @@ func (c *UpdateFee) MaxPayloadLength(uint32) uint32 { // TargetChanID returns the channel id of the link for which this message is // intended. // -// NOTE: Part of lnd.LinkUpdater interface. +// NOTE: Part of peer.LinkUpdater interface. func (c *UpdateFee) TargetChanID() ChannelID { return c.ChanID } diff --git a/lnwire/update_fulfill_htlc.go b/lnwire/update_fulfill_htlc.go index 49344008..6c0e6339 100644 --- a/lnwire/update_fulfill_htlc.go +++ b/lnwire/update_fulfill_htlc.go @@ -82,7 +82,7 @@ func (c *UpdateFulfillHTLC) MaxPayloadLength(uint32) uint32 { // TargetChanID returns the channel id of the link for which this message is // intended. // -// NOTE: Part of lnd.LinkUpdater interface. +// NOTE: Part of peer.LinkUpdater interface. func (c *UpdateFulfillHTLC) TargetChanID() ChannelID { return c.ChanID } diff --git a/log.go b/log.go index bf194969..f809acdf 100644 --- a/log.go +++ b/log.go @@ -30,6 +30,7 @@ import ( "github.com/lightningnetwork/lnd/lnwallet/chanfunding" "github.com/lightningnetwork/lnd/monitoring" "github.com/lightningnetwork/lnd/netann" + "github.com/lightningnetwork/lnd/peer" "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing/localchans" @@ -75,7 +76,6 @@ var ( // function should always be called as soon as possible to finish // setting them up properly with a root logger. ltndLog = addLndPkgLogger("LTND") - peerLog = addLndPkgLogger("PEER") rpcsLog = addLndPkgLogger("RPCS") srvrLog = addLndPkgLogger("SRVR") fndgLog = addLndPkgLogger("FNDG") @@ -122,6 +122,7 @@ func SetupLoggers(root *build.RotatingLogWriter) { AddSubLogger(root, "WTCL", wtclient.UseLogger) AddSubLogger(root, "PRNF", peernotifier.UseLogger) AddSubLogger(root, "CHFD", chanfunding.UseLogger) + AddSubLogger(root, "PEER", peer.UseLogger) AddSubLogger(root, "CHCL", chancloser.UseLogger) AddSubLogger(root, routing.Subsystem, routing.UseLogger, localchans.UseLogger) diff --git a/peer.go b/peer/brontide.go similarity index 94% rename from peer.go rename to peer/brontide.go index 4852f56f..948900f1 100644 --- a/peer.go +++ b/peer/brontide.go @@ -1,4 +1,4 @@ -package lnd +package peer import ( "bytes" @@ -29,7 +29,6 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chancloser" "github.com/lightningnetwork/lnd/lnwire" - ppeer "github.com/lightningnetwork/lnd/peer" "github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/ticker" ) @@ -62,6 +61,13 @@ const ( ErrorBufferSize = 10 ) +var ( + // ErrChannelNotFound is an error returned when a channel is queried and + // either the Brontide doesn't know of it, or the channel in question + // is pending. + ErrChannelNotFound = fmt.Errorf("channel not found") +) + // outgoingMsg packages an lnwire.Message to be sent out on the wire, along with // a buffered channel which will be sent upon once the write is complete. This // buffered channel acts as a semaphore to be used for synchronization purposes. @@ -106,13 +112,13 @@ type TimestampedError struct { Timestamp time.Time } -// peer is an active peer on the Lightning Network. This struct is responsible +// Brontide is an active peer on the Lightning Network. This struct is responsible // for managing any channel state related to this peer. To do so, it has // several helper goroutines to handle events such as HTLC timeouts, new // funding workflow, and detecting an uncooperative closure of any active // channels. // TODO(roasbeef): proper reconnection logic -type peer struct { +type Brontide struct { // MUST be used atomically. started int32 disconnect int32 @@ -131,7 +137,7 @@ type peer struct { // our last ping message. To be used atomically. pingLastSend int64 - cfg ppeer.Config + cfg Config // activeSignal when closed signals that the peer is now active and // ready to process messages. @@ -141,7 +147,7 @@ type peer struct { // It will be zero for peers that did not successfully call Start(). startTime time.Time - // sendQueue is the channel which is used to queue outgoing to be + // sendQueue is the channel which is used to queue outgoing messages to be // written onto the wire. Note that this channel is unbuffered. sendQueue chan outgoingMsg @@ -213,13 +219,12 @@ type peer struct { wg sync.WaitGroup } -// A compile-time check to ensure that peer satisfies the lnpeer.Peer interface. -var _ lnpeer.Peer = (*peer)(nil) +// A compile-time check to ensure that Brontide satisfies the lnpeer.Peer interface. +var _ lnpeer.Peer = (*Brontide)(nil) -// newPeer creates a new peer from a peer.Config object. -func newPeer(cfg ppeer.Config) *peer { - - p := &peer{ +// NewBrontide creates a new Brontide from a peer.Config struct. +func NewBrontide(cfg Config) *Brontide { + p := &Brontide{ cfg: cfg, activeSignal: make(chan struct{}), sendQueue: make(chan outgoingMsg), @@ -228,8 +233,7 @@ func newPeer(cfg ppeer.Config) *peer { activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), newChannels: make(chan *newChannelMsg, 1), - activeMsgStreams: make(map[lnwire.ChannelID]*msgStream), - + activeMsgStreams: make(map[lnwire.ChannelID]*msgStream), activeChanCloses: make(map[lnwire.ChannelID]*chancloser.ChanCloser), localCloseChanReqs: make(chan *htlcswitch.ChanClose), linkFailures: make(chan linkFailureReport), @@ -244,7 +248,7 @@ func newPeer(cfg ppeer.Config) *peer { // Start starts all helper goroutines the peer needs for normal operations. In // the case this peer has already been started, then this function is a loop. -func (p *peer) Start() error { +func (p *Brontide) Start() error { if atomic.AddInt32(&p.started, 1) != 1 { return nil } @@ -366,12 +370,12 @@ func (p *peer) Start() error { // initGossipSync initializes either a gossip syncer or an initial routing // dump, depending on the negotiated synchronization method. -func (p *peer) initGossipSync() { +func (p *Brontide) initGossipSync() { // If the remote peer knows of the new gossip queries feature, then // we'll create a new gossipSyncer in the AuthenticatedGossiper for it. if p.remoteFeatures.HasFeature(lnwire.GossipQueriesOptional) { - srvrLog.Infof("Negotiated chan series queries with %x", + peerLog.Infof("Negotiated chan series queries with %x", p.cfg.PubKeyBytes[:]) // Register the peer's gossip syncer with the gossiper. @@ -385,7 +389,6 @@ func (p *peer) initGossipSync() { // peers. p.cfg.AuthGossiper.InitSyncState(p) } - } // QuitSignal is a method that should return a channel which will be sent upon @@ -394,7 +397,7 @@ func (p *peer) initGossipSync() { // exits. // // NOTE: Part of the lnpeer.Peer interface. -func (p *peer) QuitSignal() <-chan struct{} { +func (p *Brontide) QuitSignal() <-chan struct{} { return p.quit } @@ -402,7 +405,7 @@ func (p *peer) QuitSignal() <-chan struct{} { // channels returned by the database. It returns a slice of channel reestablish // messages that should be sent to the peer immediately, in case we have borked // channels that haven't been closed yet. -func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( +func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) ( []lnwire.Message, error) { // Return a slice of messages to send to the peers in case the channel @@ -537,7 +540,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( } // addLink creates and adds a new ChannelLink from the specified channel. -func (p *peer) addLink(chanPoint *wire.OutPoint, +func (p *Brontide) addLink(chanPoint *wire.OutPoint, lnChan *lnwallet.LightningChannel, forwardingPolicy *htlcswitch.ForwardingPolicy, chainEvents *contractcourt.ChainEventSubscription, @@ -563,25 +566,25 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, } } + updateContractSignals := func(signals *contractcourt.ContractSignals) error { + return p.cfg.ChainArb.UpdateContractSignals(*chanPoint, signals) + } + linkCfg := htlcswitch.ChannelLinkConfig{ - Peer: p, - DecodeHopIterators: p.cfg.Sphinx.DecodeHopIterators, - ExtractErrorEncrypter: p.cfg.Sphinx.ExtractErrorEncrypter, - FetchLastChannelUpdate: p.cfg.FetchLastChanUpdate, - HodlMask: p.cfg.Hodl.Mask(), - Registry: p.cfg.Invoices, - Switch: p.cfg.Switch, - Circuits: p.cfg.Switch.CircuitModifier(), - ForwardPackets: p.cfg.InterceptSwitch.ForwardPackets, - FwrdingPolicy: *forwardingPolicy, - FeeEstimator: p.cfg.FeeEstimator, - PreimageCache: p.cfg.WitnessBeacon, - ChainEvents: chainEvents, - UpdateContractSignals: func(signals *contractcourt.ContractSignals) error { - return p.cfg.ChainArb.UpdateContractSignals( - *chanPoint, signals, - ) - }, + Peer: p, + DecodeHopIterators: p.cfg.Sphinx.DecodeHopIterators, + ExtractErrorEncrypter: p.cfg.Sphinx.ExtractErrorEncrypter, + FetchLastChannelUpdate: p.cfg.FetchLastChanUpdate, + HodlMask: p.cfg.Hodl.Mask(), + Registry: p.cfg.Invoices, + Switch: p.cfg.Switch, + Circuits: p.cfg.Switch.CircuitModifier(), + ForwardPackets: p.cfg.InterceptSwitch.ForwardPackets, + FwrdingPolicy: *forwardingPolicy, + FeeEstimator: p.cfg.FeeEstimator, + PreimageCache: p.cfg.WitnessBeacon, + ChainEvents: chainEvents, + UpdateContractSignals: updateContractSignals, OnChannelFailure: onChannelFailure, SyncStates: syncStates, BatchTicker: ticker.New(50 * time.Millisecond), @@ -617,7 +620,7 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, // maybeSendNodeAnn sends our node announcement to the remote peer if at least // one confirmed public channel exists with them. -func (p *peer) maybeSendNodeAnn(channels []*channeldb.OpenChannel) { +func (p *Brontide) maybeSendNodeAnn(channels []*channeldb.OpenChannel) { hasConfirmedPublicChan := false for _, channel := range channels { if channel.IsPending { @@ -636,12 +639,12 @@ func (p *peer) maybeSendNodeAnn(channels []*channeldb.OpenChannel) { ourNodeAnn, err := p.cfg.GenNodeAnnouncement(false) if err != nil { - srvrLog.Debugf("Unable to retrieve node announcement: %v", err) + peerLog.Debugf("Unable to retrieve node announcement: %v", err) return } if err := p.SendMessageLazy(false, &ourNodeAnn); err != nil { - srvrLog.Debugf("Unable to resend node announcement to %x: %v", + peerLog.Debugf("Unable to resend node announcement to %x: %v", p.cfg.PubKeyBytes, err) } } @@ -654,7 +657,7 @@ func (p *peer) maybeSendNodeAnn(channels []*channeldb.OpenChannel) { // call to Start returns no error. Otherwise, if the peer fails to start, // calling Disconnect will signal the quit channel and the method will not // block, since no goroutines were spawned. -func (p *peer) WaitForDisconnect(ready chan struct{}) { +func (p *Brontide) WaitForDisconnect(ready chan struct{}) { select { case <-ready: case <-p.quit: @@ -666,7 +669,7 @@ func (p *peer) WaitForDisconnect(ready chan struct{}) { // Disconnect terminates the connection with the remote peer. Additionally, a // signal is sent to the server and htlcSwitch indicating the resources // allocated to the peer can now be cleaned up. -func (p *peer) Disconnect(reason error) { +func (p *Brontide) Disconnect(reason error) { if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) { return } @@ -683,13 +686,13 @@ func (p *peer) Disconnect(reason error) { } // String returns the string representation of this peer. -func (p *peer) String() string { +func (p *Brontide) String() string { return fmt.Sprintf("%x@%s", p.cfg.PubKeyBytes, p.cfg.Conn.RemoteAddr()) } // readNextMessage reads, and returns the next message on the wire along with // any additional raw payload. -func (p *peer) readNextMessage() (lnwire.Message, error) { +func (p *Brontide) readNextMessage() (lnwire.Message, error) { noiseConn, ok := p.cfg.Conn.(*brontide.Conn) if !ok { return nil, fmt.Errorf("brontide.Conn required to read messages") @@ -725,7 +728,6 @@ func (p *peer) readNextMessage() (lnwire.Message, error) { rawMsg, readErr = noiseConn.ReadNextBody(buf[:pktLen]) return readErr }) - atomic.AddUint64(&p.bytesReceived, uint64(len(rawMsg))) if err != nil { return nil, err @@ -752,7 +754,7 @@ func (p *peer) readNextMessage() (lnwire.Message, error) { type msgStream struct { streamShutdown int32 // To be used atomically. - peer *peer + peer *Brontide apply func(lnwire.Message) @@ -775,7 +777,7 @@ type msgStream struct { // that should be buffered in the internal queue. Callers should set this to a // sane value that avoids blocking unnecessarily, but doesn't allow an // unbounded amount of memory to be allocated to buffer incoming messages. -func newMsgStream(p *peer, startMsg, stopMsg string, bufSize uint32, +func newMsgStream(p *Brontide, startMsg, stopMsg string, bufSize uint32, apply func(lnwire.Message)) *msgStream { stream := &msgStream{ @@ -906,7 +908,7 @@ func (ms *msgStream) AddMsg(msg lnwire.Message) { // waitUntilLinkActive waits until the target link is active and returns a // ChannelLink to pass messages to. It accomplishes this by subscribing to // an ActiveLinkEvent which is emitted by the link when it first starts up. -func waitUntilLinkActive(p *peer, +func waitUntilLinkActive(p *Brontide, cid lnwire.ChannelID) htlcswitch.ChannelLink { // Subscribe to receive channel events. @@ -974,7 +976,7 @@ func waitUntilLinkActive(p *peer, // dispatch a message to a channel before it is fully active. A reference to the // channel this stream forwards to his held in scope to prevent unnecessary // lookups. -func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { +func newChanMsgStream(p *Brontide, cid lnwire.ChannelID) *msgStream { var chanLink htlcswitch.ChannelLink @@ -1015,14 +1017,17 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { // newDiscMsgStream is used to setup a msgStream between the peer and the // authenticated gossiper. This stream should be used to forward all remote // channel announcements. -func newDiscMsgStream(p *peer) *msgStream { - return newMsgStream(p, +func newDiscMsgStream(p *Brontide) *msgStream { + apply := func(msg lnwire.Message) { + p.cfg.AuthGossiper.ProcessRemoteAnnouncement(msg, p) + } + + return newMsgStream( + p, "Update stream for gossiper created", "Update stream for gossiper exited", 1000, - func(msg lnwire.Message) { - p.cfg.AuthGossiper.ProcessRemoteAnnouncement(msg, p) - }, + apply, ) } @@ -1030,7 +1035,7 @@ func newDiscMsgStream(p *peer) *msgStream { // properly dispatching the handling of the message to the proper subsystem. // // NOTE: This method MUST be run as a goroutine. -func (p *peer) readHandler() { +func (p *Brontide) readHandler() { defer p.wg.Done() // We'll stop the timer after a new messages is received, and also @@ -1226,7 +1231,7 @@ out: // isActiveChannel returns true if the provided channel id is active, otherwise // returns false. -func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool { +func (p *Brontide) isActiveChannel(chanID lnwire.ChannelID) bool { p.activeChanMtx.RLock() _, ok := p.activeChannels[chanID] p.activeChanMtx.RUnlock() @@ -1237,7 +1242,7 @@ func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool { // current timestamp. Errors are only stored if we have at least one active // channel with the peer to mitigate a dos vector where a peer costlessly // connects to us and spams us with errors. -func (p *peer) storeError(err error) { +func (p *Brontide) storeError(err error) { var haveChannels bool p.activeChanMtx.RLock() @@ -1270,7 +1275,7 @@ func (p *peer) storeError(err error) { // open with the peer. // // NOTE: This method should only be called from within the readHandler. -func (p *peer) handleError(msg *lnwire.Error) bool { +func (p *Brontide) handleError(msg *lnwire.Error) bool { key := p.cfg.Addr.IdentityKey // Store the error we have received. @@ -1438,7 +1443,7 @@ func messageSummary(msg lnwire.Message) string { // less spammy log messages in trace mode by setting the 'Curve" parameter to // nil. Doing this avoids printing out each of the field elements in the curve // parameters for secp256k1. -func (p *peer) logWireMessage(msg lnwire.Message, read bool) { +func (p *Brontide) logWireMessage(msg lnwire.Message, read bool) { summaryPrefix := "Received" if !read { summaryPrefix = "Sending" @@ -1500,7 +1505,7 @@ func (p *peer) logWireMessage(msg lnwire.Message, read bool) { // message buffered on the connection. It is safe to call this method again // with a nil message iff a timeout error is returned. This will continue to // flush the pending message to the wire. -func (p *peer) writeMessage(msg lnwire.Message) error { +func (p *Brontide) writeMessage(msg lnwire.Message) error { // Simply exit if we're shutting down. if atomic.LoadInt32(&p.disconnect) != 0 { return lnpeer.ErrPeerExiting @@ -1574,7 +1579,7 @@ func (p *peer) writeMessage(msg lnwire.Message) error { // drained. // // NOTE: This method MUST be run as a goroutine. -func (p *peer) writeHandler() { +func (p *Brontide) writeHandler() { // We'll stop the timer after a new messages is sent, and also reset it // after we process the next message. idleTimer := time.AfterFunc(idleTimeout, func() { @@ -1667,7 +1672,7 @@ out: // to be eventually sent out on the wire by the writeHandler. // // NOTE: This method MUST be run as a goroutine. -func (p *peer) queueHandler() { +func (p *Brontide) queueHandler() { defer p.wg.Done() // priorityMsgs holds an in order list of messages deemed high-priority @@ -1735,7 +1740,7 @@ func (p *peer) queueHandler() { // connection is still active. // // NOTE: This method MUST be run as a goroutine. -func (p *peer) pingHandler() { +func (p *Brontide) pingHandler() { defer p.wg.Done() pingTicker := time.NewTicker(pingInterval) @@ -1756,28 +1761,28 @@ out: } // PingTime returns the estimated ping time to the peer in microseconds. -func (p *peer) PingTime() int64 { +func (p *Brontide) PingTime() int64 { return atomic.LoadInt64(&p.pingTime) } // queueMsg adds the lnwire.Message to the back of the high priority send queue. // If the errChan is non-nil, an error is sent back if the msg failed to queue // or failed to write, and nil otherwise. -func (p *peer) queueMsg(msg lnwire.Message, errChan chan error) { +func (p *Brontide) queueMsg(msg lnwire.Message, errChan chan error) { p.queue(true, msg, errChan) } // queueMsgLazy adds the lnwire.Message to the back of the low priority send // queue. If the errChan is non-nil, an error is sent back if the msg failed to // queue or failed to write, and nil otherwise. -func (p *peer) queueMsgLazy(msg lnwire.Message, errChan chan error) { +func (p *Brontide) queueMsgLazy(msg lnwire.Message, errChan chan error) { p.queue(false, msg, errChan) } // queue sends a given message to the queueHandler using the passed priority. If // the errChan is non-nil, an error is sent back if the msg failed to queue or // failed to write, and nil otherwise. -func (p *peer) queue(priority bool, msg lnwire.Message, +func (p *Brontide) queue(priority bool, msg lnwire.Message, errChan chan error) { select { @@ -1793,7 +1798,7 @@ func (p *peer) queue(priority bool, msg lnwire.Message, // ChannelSnapshots returns a slice of channel snapshots detailing all // currently active channels maintained with the remote peer. -func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot { +func (p *Brontide) ChannelSnapshots() []*channeldb.ChannelSnapshot { p.activeChanMtx.RLock() defer p.activeChanMtx.RUnlock() @@ -1819,7 +1824,7 @@ func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot { // genDeliveryScript returns a new script to be used to send our funds to in // the case of a cooperative channel close negotiation. -func (p *peer) genDeliveryScript() ([]byte, error) { +func (p *Brontide) genDeliveryScript() ([]byte, error) { deliveryAddr, err := p.cfg.Wallet.NewAddress( lnwallet.WitnessPubKey, false, ) @@ -1837,7 +1842,7 @@ func (p *peer) genDeliveryScript() ([]byte, error) { // channels maintained with the remote peer. // // NOTE: This method MUST be run as a goroutine. -func (p *peer) channelManager() { +func (p *Brontide) channelManager() { defer p.wg.Done() // reenableTimeout will fire once after the configured channel status @@ -2010,7 +2015,6 @@ out: reenableTimeout = nil case <-p.quit: - // As, we've been signalled to exit, we'll reset all // our active channel back to their default state. p.activeChanMtx.Lock() @@ -2033,7 +2037,7 @@ out: // peer, and reenables each public, non-pending channel. This is done at the // gossip level by broadcasting a new ChannelUpdate with the disabled bit unset. // No message will be sent if the channel is already enabled. -func (p *peer) reenableActiveChannels() { +func (p *Brontide) reenableActiveChannels() { // First, filter all known channels with this peer for ones that are // both public and not pending. var activePublicChans []wire.OutPoint @@ -2071,7 +2075,7 @@ func (p *peer) reenableActiveChannels() { for _, chanPoint := range activePublicChans { err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint) if err != nil { - srvrLog.Errorf("Unable to enable channel %v: %v", + peerLog.Errorf("Unable to enable channel %v: %v", chanPoint, err) } } @@ -2081,7 +2085,7 @@ func (p *peer) reenableActiveChannels() { // for the target channel ID. If the channel isn't active an error is returned. // Otherwise, either an existing state machine will be returned, or a new one // will be created. -func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) ( +func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) ( *chancloser.ChanCloser, error) { // First, we'll ensure that we actually know of the target channel. If @@ -2195,7 +2199,7 @@ func chooseDeliveryScript(upfront, // handleLocalCloseReq kicks-off the workflow to execute a cooperative or // forced unilateral closure of the channel initiated by a local subsystem. -func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) { +func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) { chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint) p.activeChanMtx.RLock() @@ -2316,7 +2320,7 @@ type linkFailureReport struct { // fails. It facilitates the removal of all channel state within the peer, // force closing the channel depending on severity, and sending the error // message back to the remote party. -func (p *peer) handleLinkFailure(failure linkFailureReport) { +func (p *Brontide) handleLinkFailure(failure linkFailureReport) { // We begin by wiping the link, which will remove it from the switch, // such that it won't be attempted used for any more updates. // @@ -2371,7 +2375,7 @@ func (p *peer) handleLinkFailure(failure linkFailureReport) { // machine should be passed in. Once the transaction has been sufficiently // confirmed, the channel will be marked as fully closed within the database, // and any clients will be notified of updates to the closing state. -func (p *peer) finalizeChanClosure(chanCloser *chancloser.ChanCloser) { +func (p *Brontide) finalizeChanClosure(chanCloser *chancloser.ChanCloser) { closeReq := chanCloser.CloseRequest() // First, we'll clear all indexes related to the channel in question. @@ -2466,7 +2470,7 @@ func WaitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier, // WipeChannel removes the passed channel point from all indexes associated with // the peer and the switch. -func (p *peer) WipeChannel(chanPoint *wire.OutPoint) { +func (p *Brontide) WipeChannel(chanPoint *wire.OutPoint) { chanID := lnwire.NewChanIDFromOutPoint(chanPoint) p.activeChanMtx.Lock() @@ -2480,7 +2484,7 @@ func (p *peer) WipeChannel(chanPoint *wire.OutPoint) { // handleInitMsg handles the incoming init message which contains global and // local feature vectors. If feature vectors are incompatible then disconnect. -func (p *peer) handleInitMsg(msg *lnwire.Init) error { +func (p *Brontide) handleInitMsg(msg *lnwire.Init) error { // First, merge any features from the legacy global features field into // those presented in the local features fields. err := msg.Features.Merge(msg.GlobalFeatures) @@ -2524,7 +2528,7 @@ func (p *peer) handleInitMsg(msg *lnwire.Init) error { // behavior off the set of negotiated feature bits. // // NOTE: Part of the lnpeer.Peer interface. -func (p *peer) LocalFeatures() *lnwire.FeatureVector { +func (p *Brontide) LocalFeatures() *lnwire.FeatureVector { return p.cfg.Features } @@ -2533,13 +2537,13 @@ func (p *peer) LocalFeatures() *lnwire.FeatureVector { // their behavior off the set of negotiated feature bits. // // NOTE: Part of the lnpeer.Peer interface. -func (p *peer) RemoteFeatures() *lnwire.FeatureVector { +func (p *Brontide) RemoteFeatures() *lnwire.FeatureVector { return p.remoteFeatures } // sendInitMsg sends the Init message to the remote peer. This message contains our // currently supported local and global features. -func (p *peer) sendInitMsg() error { +func (p *Brontide) sendInitMsg() error { msg := lnwire.NewInitMessage( p.cfg.LegacyFeatures.RawFeatureVector, p.cfg.Features.RawFeatureVector, @@ -2550,7 +2554,7 @@ func (p *peer) sendInitMsg() error { // resendChanSyncMsg will attempt to find a channel sync message for the closed // channel and resend it to our peer. -func (p *peer) resendChanSyncMsg(cid lnwire.ChannelID) error { +func (p *Brontide) resendChanSyncMsg(cid lnwire.ChannelID) error { // If we already re-sent the mssage for this channel, we won't do it // again. if _, ok := p.resentChanSyncMsg[cid]; ok { @@ -2598,7 +2602,7 @@ func (p *peer) resendChanSyncMsg(cid lnwire.ChannelID) error { // otherwise it returns immediately after queuing. // // NOTE: Part of the lnpeer.Peer interface. -func (p *peer) SendMessage(sync bool, msgs ...lnwire.Message) error { +func (p *Brontide) SendMessage(sync bool, msgs ...lnwire.Message) error { return p.sendMessage(sync, true, msgs...) } @@ -2608,7 +2612,7 @@ func (p *peer) SendMessage(sync bool, msgs ...lnwire.Message) error { // otherwise it returns immediately after queueing. // // NOTE: Part of the lnpeer.Peer interface. -func (p *peer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error { +func (p *Brontide) SendMessageLazy(sync bool, msgs ...lnwire.Message) error { return p.sendMessage(sync, false, msgs...) } @@ -2616,7 +2620,7 @@ func (p *peer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error { // to the remote peer. If sync is true, this method will block until the // messages have been sent to the remote peer or an error is returned, otherwise // it returns immediately after queueing. -func (p *peer) sendMessage(sync, priority bool, msgs ...lnwire.Message) error { +func (p *Brontide) sendMessage(sync, priority bool, msgs ...lnwire.Message) error { // Add all incoming messages to the outgoing queue. A list of error // chans is populated for each message if the caller requested a sync // send. @@ -2659,21 +2663,21 @@ func (p *peer) sendMessage(sync, priority bool, msgs ...lnwire.Message) error { // PubKey returns the pubkey of the peer in compressed serialized format. // // NOTE: Part of the lnpeer.Peer interface. -func (p *peer) PubKey() [33]byte { +func (p *Brontide) PubKey() [33]byte { return p.cfg.PubKeyBytes } // IdentityKey returns the public key of the remote peer. // // NOTE: Part of the lnpeer.Peer interface. -func (p *peer) IdentityKey() *btcec.PublicKey { +func (p *Brontide) IdentityKey() *btcec.PublicKey { return p.cfg.Addr.IdentityKey } // Address returns the network address of the remote peer. // // NOTE: Part of the lnpeer.Peer interface. -func (p *peer) Address() net.Addr { +func (p *Brontide) Address() net.Addr { return p.cfg.Addr.Address } @@ -2681,7 +2685,7 @@ func (p *peer) Address() net.Addr { // added if the cancel channel is closed. // // NOTE: Part of the lnpeer.Peer interface. -func (p *peer) AddNewChannel(channel *channeldb.OpenChannel, +func (p *Brontide) AddNewChannel(channel *channeldb.OpenChannel, cancel <-chan struct{}) error { errChan := make(chan error, 1) @@ -2710,14 +2714,14 @@ func (p *peer) AddNewChannel(channel *channeldb.OpenChannel, // StartTime returns the time at which the connection was established if the // peer started successfully, and zero otherwise. -func (p *peer) StartTime() time.Time { +func (p *Brontide) StartTime() time.Time { return p.startTime } // handleCloseMsg is called when a new cooperative channel closure related // message is received from the remote peer. We'll use this message to advance // the chan closer state machine. -func (p *peer) handleCloseMsg(msg *closeMsg) { +func (p *Brontide) handleCloseMsg(msg *closeMsg) { // We'll now fetch the matching closing state machine in order to continue, // or finalize the channel closure process. chanCloser, err := p.fetchActiveChanCloser(msg.cid) @@ -2778,7 +2782,7 @@ func (p *peer) handleCloseMsg(msg *closeMsg) { // HandleLocalCloseChanReqs accepts a *htlcswitch.ChanClose and passes it onto // the channelManager goroutine, which will shut down the link and possibly // close the channel. -func (p *peer) HandleLocalCloseChanReqs(req *htlcswitch.ChanClose) { +func (p *Brontide) HandleLocalCloseChanReqs(req *htlcswitch.ChanClose) { select { case p.localCloseChanReqs <- req: peerLog.Infof("Local close channel request delivered to peer: %v", @@ -2790,56 +2794,46 @@ func (p *peer) HandleLocalCloseChanReqs(req *htlcswitch.ChanClose) { } // NetAddress returns the network of the remote peer as an lnwire.NetAddress. -func (p *peer) NetAddress() *lnwire.NetAddress { +func (p *Brontide) NetAddress() *lnwire.NetAddress { return p.cfg.Addr } -// Inbound returns cfg.Inbound. -func (p *peer) Inbound() bool { +// Inbound is a getter for the Brontide's Inbound boolean in cfg. +func (p *Brontide) Inbound() bool { return p.cfg.Inbound } -// ConnReq returns cfg.ConnReq. -func (p *peer) ConnReq() *connmgr.ConnReq { +// ConnReq is a getter for the Brontide's connReq in cfg. +func (p *Brontide) ConnReq() *connmgr.ConnReq { return p.cfg.ConnReq } -// ErrorBuffer returns cfg.ErrorBuffer. -func (p *peer) ErrorBuffer() *queue.CircularBuffer { +// ErrorBuffer is a getter for the Brontide's errorBuffer in cfg. +func (p *Brontide) ErrorBuffer() *queue.CircularBuffer { return p.cfg.ErrorBuffer } // SetAddress sets the remote peer's address given an address. -func (p *peer) SetAddress(address net.Addr) { +func (p *Brontide) SetAddress(address net.Addr) { p.cfg.Addr.Address = address } // ActiveSignal returns the peer's active signal. -func (p *peer) ActiveSignal() chan struct{} { +func (p *Brontide) ActiveSignal() chan struct{} { return p.activeSignal } // Conn returns a pointer to the peer's connection struct. -func (p *peer) Conn() net.Conn { +func (p *Brontide) Conn() net.Conn { return p.cfg.Conn } // BytesReceived returns the number of bytes received from the peer. -func (p *peer) BytesReceived() uint64 { +func (p *Brontide) BytesReceived() uint64 { return atomic.LoadUint64(&p.bytesReceived) } // BytesSent returns the number of bytes sent to the peer. -func (p *peer) BytesSent() uint64 { +func (p *Brontide) BytesSent() uint64 { return atomic.LoadUint64(&p.bytesSent) } - -// LinkUpdater is an interface implemented by most messages in BOLT 2 that are -// allowed to update the channel state. -type LinkUpdater interface { - // TargetChanID returns the channel id of the link for which this - // message is intended. - TargetChanID() lnwire.ChannelID -} - -// TODO(roasbeef): make all start/stop mutexes a CAS diff --git a/peer_test.go b/peer/brontide_test.go similarity index 99% rename from peer_test.go rename to peer/brontide_test.go index adbb6570..78800b11 100644 --- a/peer_test.go +++ b/peer/brontide_test.go @@ -1,6 +1,4 @@ -// +build !rpctest - -package lnd +package peer import ( "bytes" @@ -35,7 +33,7 @@ var ( func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) { t.Parallel() - notifier := &mockNotfier{ + notifier := &mockNotifier{ confChannel: make(chan *chainntnfs.TxConfirmation), } broadcastTxChan := make(chan *wire.MsgTx) @@ -136,7 +134,7 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) { func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) { t.Parallel() - notifier := &mockNotfier{ + notifier := &mockNotifier{ confChannel: make(chan *chainntnfs.TxConfirmation), } broadcastTxChan := make(chan *wire.MsgTx) @@ -229,7 +227,6 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) { } // Alice should respond with the ClosingSigned they both agreed upon. - select { case outMsg := <-alicePeer.outgoingQueue: msg = outMsg.msg @@ -257,7 +254,7 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) { func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) { t.Parallel() - notifier := &mockNotfier{ + notifier := &mockNotifier{ confChannel: make(chan *chainntnfs.TxConfirmation), } broadcastTxChan := make(chan *wire.MsgTx) @@ -449,7 +446,7 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) { func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) { t.Parallel() - notifier := &mockNotfier{ + notifier := &mockNotifier{ confChannel: make(chan *chainntnfs.TxConfirmation), } broadcastTxChan := make(chan *wire.MsgTx) @@ -782,7 +779,7 @@ func TestCustomShutdownScript(t *testing.T) { test := test t.Run(test.name, func(t *testing.T) { - notifier := &mockNotfier{ + notifier := &mockNotifier{ confChannel: make(chan *chainntnfs.TxConfirmation), } broadcastTxChan := make(chan *wire.MsgTx) diff --git a/peer/log.go b/peer/log.go new file mode 100644 index 00000000..a1f9bda3 --- /dev/null +++ b/peer/log.go @@ -0,0 +1,40 @@ +package peer + +import ( + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/build" +) + +// peerLog is a logger that is initialized with the btclog.Disabled logger. +var peerLog btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger("PEER", nil)) +} + +// DisableLog disables all logging output. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. +func UseLogger(logger btclog.Logger) { + peerLog = logger +} + +// logClosure is used to provide a closure over expensive logging operations +// so they aren't performed when the logging level doesn't warrant it. +type logClosure func() string + +// String invokes the underlying function and returns the result. +func (c logClosure) String() string { + return c() +} + +// newLogClosure returns a new closure over a function that returns a string +// which itself provides a Stringer interface so that it can be used with the +// logging system. +func newLogClosure(c func() string) logClosure { + return logClosure(c) +} diff --git a/peer/test_utils.go b/peer/test_utils.go new file mode 100644 index 00000000..df74eda1 --- /dev/null +++ b/peer/test_utils.go @@ -0,0 +1,695 @@ +package peer + +import ( + "bytes" + crand "crypto/rand" + "encoding/binary" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net" + "os" + "time" + + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + "github.com/btcsuite/btcwallet/wallet/txauthor" + "github.com/btcsuite/btcwallet/wtxmgr" + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/input" + "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/lnwallet/chainfee" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/netann" + "github.com/lightningnetwork/lnd/queue" + "github.com/lightningnetwork/lnd/shachain" + "github.com/lightningnetwork/lnd/ticker" +) + +const ( + broadcastHeight = 100 +) + +var ( + alicesPrivKey = []byte{ + 0x2b, 0xd8, 0x06, 0xc9, 0x7f, 0x0e, 0x00, 0xaf, + 0x1a, 0x1f, 0xc3, 0x32, 0x8f, 0xa7, 0x63, 0xa9, + 0x26, 0x97, 0x23, 0xc8, 0xdb, 0x8f, 0xac, 0x4f, + 0x93, 0xaf, 0x71, 0xdb, 0x18, 0x6d, 0x6e, 0x90, + } + + bobsPrivKey = []byte{ + 0x81, 0xb6, 0x37, 0xd8, 0xfc, 0xd2, 0xc6, 0xda, + 0x63, 0x59, 0xe6, 0x96, 0x31, 0x13, 0xa1, 0x17, + 0xd, 0xe7, 0x95, 0xe4, 0xb7, 0x25, 0xb8, 0x4d, + 0x1e, 0xb, 0x4c, 0xfd, 0x9e, 0xc5, 0x8c, 0xe9, + } + + // Use a hard-coded HD seed. + testHdSeed = [32]byte{ + 0xb7, 0x94, 0x38, 0x5f, 0x2d, 0x1e, 0xf7, 0xab, + 0x4d, 0x92, 0x73, 0xd1, 0x90, 0x63, 0x81, 0xb4, + 0x4f, 0x2f, 0x6f, 0x25, 0x88, 0xa3, 0xef, 0xb9, + 0x6a, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53, + } + + // Just use some arbitrary bytes as delivery script. + dummyDeliveryScript = alicesPrivKey + + // testTx is used as the default funding txn for single-funder channels. + testTx = &wire.MsgTx{ + Version: 1, + TxIn: []*wire.TxIn{ + { + PreviousOutPoint: wire.OutPoint{ + Hash: chainhash.Hash{}, + Index: 0xffffffff, + }, + SignatureScript: []byte{0x04, 0x31, 0xdc, 0x00, 0x1b, 0x01, 0x62}, + Sequence: 0xffffffff, + }, + }, + TxOut: []*wire.TxOut{ + { + Value: 5000000000, + PkScript: []byte{ + 0x41, // OP_DATA_65 + 0x04, 0xd6, 0x4b, 0xdf, 0xd0, 0x9e, 0xb1, 0xc5, + 0xfe, 0x29, 0x5a, 0xbd, 0xeb, 0x1d, 0xca, 0x42, + 0x81, 0xbe, 0x98, 0x8e, 0x2d, 0xa0, 0xb6, 0xc1, + 0xc6, 0xa5, 0x9d, 0xc2, 0x26, 0xc2, 0x86, 0x24, + 0xe1, 0x81, 0x75, 0xe8, 0x51, 0xc9, 0x6b, 0x97, + 0x3d, 0x81, 0xb0, 0x1c, 0xc3, 0x1f, 0x04, 0x78, + 0x34, 0xbc, 0x06, 0xd6, 0xd6, 0xed, 0xf6, 0x20, + 0xd1, 0x84, 0x24, 0x1a, 0x6a, 0xed, 0x8b, 0x63, + 0xa6, // 65-byte signature + 0xac, // OP_CHECKSIG + }, + }, + }, + LockTime: 5, + } +) + +// noUpdate is a function which can be used as a parameter in createTestPeer to +// call the setup code with no custom values on the channels set up. +var noUpdate = func(a, b *channeldb.OpenChannel) {} + +type mockSigner struct { + key *btcec.PrivateKey +} + +func (m *mockSigner) SignOutputRaw(tx *wire.MsgTx, + signDesc *input.SignDescriptor) (input.Signature, error) { + amt := signDesc.Output.Value + witnessScript := signDesc.WitnessScript + privKey := m.key + + if !privKey.PubKey().IsEqual(signDesc.KeyDesc.PubKey) { + return nil, fmt.Errorf("incorrect key passed") + } + + switch { + case signDesc.SingleTweak != nil: + privKey = input.TweakPrivKey(privKey, + signDesc.SingleTweak) + case signDesc.DoubleTweak != nil: + privKey = input.DeriveRevocationPrivKey(privKey, + signDesc.DoubleTweak) + } + + sig, err := txscript.RawTxInWitnessSignature(tx, signDesc.SigHashes, + signDesc.InputIndex, amt, witnessScript, signDesc.HashType, + privKey) + if err != nil { + return nil, err + } + + return btcec.ParseDERSignature(sig[:len(sig)-1], btcec.S256()) +} + +func (m *mockSigner) ComputeInputScript(tx *wire.MsgTx, + signDesc *input.SignDescriptor) (*input.Script, error) { + + // TODO(roasbeef): expose tweaked signer from lnwallet so don't need to + // duplicate this code? + + privKey := m.key + + switch { + case signDesc.SingleTweak != nil: + privKey = input.TweakPrivKey(privKey, + signDesc.SingleTweak) + case signDesc.DoubleTweak != nil: + privKey = input.DeriveRevocationPrivKey(privKey, + signDesc.DoubleTweak) + } + + witnessScript, err := txscript.WitnessSignature(tx, signDesc.SigHashes, + signDesc.InputIndex, signDesc.Output.Value, signDesc.Output.PkScript, + signDesc.HashType, privKey, true) + if err != nil { + return nil, err + } + + return &input.Script{ + Witness: witnessScript, + }, nil +} + +var _ input.Signer = (*mockSigner)(nil) + +type mockChainIO struct { + bestHeight int32 +} + +func (m *mockChainIO) GetBestBlock() (*chainhash.Hash, int32, error) { + return nil, m.bestHeight, nil +} + +func (*mockChainIO) GetUtxo(op *wire.OutPoint, _ []byte, + heightHint uint32, _ <-chan struct{}) (*wire.TxOut, error) { + return nil, nil +} + +func (*mockChainIO) GetBlockHash(blockHeight int64) (*chainhash.Hash, error) { + return nil, nil +} + +func (*mockChainIO) GetBlock(blockHash *chainhash.Hash) (*wire.MsgBlock, error) { + return nil, nil +} + +var _ lnwallet.BlockChainIO = (*mockChainIO)(nil) + +type mockWalletController struct { + rootKey *btcec.PrivateKey + publishedTxns chan *wire.MsgTx +} + +func (*mockWalletController) FetchInputInfo(prevOut *wire.OutPoint) ( + *lnwallet.Utxo, error) { + + return nil, nil +} + +func (*mockWalletController) ConfirmedBalance(confs int32) (btcutil.Amount, + error) { + + return 0, nil +} + +func (m *mockWalletController) NewAddress(addrType lnwallet.AddressType, + change bool) (btcutil.Address, error) { + + addr, _ := btcutil.NewAddressPubKey( + m.rootKey.PubKey().SerializeCompressed(), &chaincfg.MainNetParams, + ) + return addr, nil +} + +func (*mockWalletController) LastUnusedAddress(addrType lnwallet.AddressType) ( + btcutil.Address, error) { + + return nil, nil +} + +func (*mockWalletController) IsOurAddress(a btcutil.Address) bool { + return false +} + +func (*mockWalletController) SendOutputs(outputs []*wire.TxOut, + feeRate chainfee.SatPerKWeight, label string) (*wire.MsgTx, error) { + + return nil, nil +} + +func (*mockWalletController) CreateSimpleTx(outputs []*wire.TxOut, + feeRate chainfee.SatPerKWeight, dryRun bool) (*txauthor.AuthoredTx, error) { + + return nil, nil +} + +func (*mockWalletController) ListUnspentWitness(minconfirms, + maxconfirms int32) ([]*lnwallet.Utxo, error) { + + return nil, nil +} + +func (*mockWalletController) ListTransactionDetails(startHeight, + endHeight int32) ([]*lnwallet.TransactionDetail, error) { + + return nil, nil +} + +func (*mockWalletController) LockOutpoint(o wire.OutPoint) {} + +func (*mockWalletController) UnlockOutpoint(o wire.OutPoint) {} + +func (m *mockWalletController) PublishTransaction(tx *wire.MsgTx, + label string) error { + m.publishedTxns <- tx + return nil +} + +func (*mockWalletController) LabelTransaction(hash chainhash.Hash, + label string, overwrite bool) error { + + return nil +} + +func (*mockWalletController) SubscribeTransactions() ( + lnwallet.TransactionSubscription, error) { + + return nil, nil +} + +func (*mockWalletController) IsSynced() (bool, int64, error) { + return false, 0, nil +} + +func (*mockWalletController) Start() error { + return nil +} + +func (*mockWalletController) Stop() error { + return nil +} + +func (*mockWalletController) BackEnd() string { + return "" +} + +func (*mockWalletController) LeaseOutput(wtxmgr.LockID, + wire.OutPoint) (time.Time, error) { + + return time.Now(), nil +} + +func (*mockWalletController) ReleaseOutput(wtxmgr.LockID, wire.OutPoint) error { + return nil +} + +func (*mockWalletController) GetRecoveryInfo() (bool, float64, error) { + return false, 0, nil +} + +var _ lnwallet.WalletController = (*mockWalletController)(nil) + +type mockNotifier struct { + confChannel chan *chainntnfs.TxConfirmation +} + +func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, + _ []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, + error) { + + return &chainntnfs.ConfirmationEvent{ + Confirmed: m.confChannel, + }, nil +} + +func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte, + heightHint uint32) (*chainntnfs.SpendEvent, error) { + + return &chainntnfs.SpendEvent{ + Spend: make(chan *chainntnfs.SpendDetail), + Cancel: func() {}, + }, nil +} + +func (m *mockNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { + + return &chainntnfs.BlockEpochEvent{ + Epochs: make(chan *chainntnfs.BlockEpoch), + Cancel: func() {}, + }, nil +} + +func (m *mockNotifier) Start() error { + return nil +} + +func (m *mockNotifier) Stop() error { + return nil +} + +func (m *mockNotifier) Started() bool { + return true +} + +var _ chainntnfs.ChainNotifier = (*mockNotifier)(nil) + +// createTestPeer creates a channel between two nodes, and returns a peer for +// one of the nodes, together with the channel seen from both nodes. It takes +// an updateChan function which can be used to modify the default values on +// the channel states for each peer. +func createTestPeer(notifier chainntnfs.ChainNotifier, + publTx chan *wire.MsgTx, updateChan func(a, b *channeldb.OpenChannel)) ( + *Brontide, *lnwallet.LightningChannel, func(), error) { + + aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes( + btcec.S256(), alicesPrivKey, + ) + aliceKeySigner := &keychain.PrivKeyDigestSigner{PrivKey: aliceKeyPriv} + bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes( + btcec.S256(), bobsPrivKey, + ) + + channelCapacity := btcutil.Amount(10 * 1e8) + channelBal := channelCapacity / 2 + aliceDustLimit := btcutil.Amount(200) + bobDustLimit := btcutil.Amount(1300) + csvTimeoutAlice := uint32(5) + csvTimeoutBob := uint32(4) + + prevOut := &wire.OutPoint{ + Hash: chainhash.Hash(testHdSeed), + Index: 0, + } + fundingTxIn := wire.NewTxIn(prevOut, nil, nil) + + aliceCfg := channeldb.ChannelConfig{ + ChannelConstraints: channeldb.ChannelConstraints{ + DustLimit: aliceDustLimit, + MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()), + ChanReserve: btcutil.Amount(rand.Int63()), + MinHTLC: lnwire.MilliSatoshi(rand.Int63()), + MaxAcceptedHtlcs: uint16(rand.Int31()), + CsvDelay: uint16(csvTimeoutAlice), + }, + MultiSigKey: keychain.KeyDescriptor{ + PubKey: aliceKeyPub, + }, + RevocationBasePoint: keychain.KeyDescriptor{ + PubKey: aliceKeyPub, + }, + PaymentBasePoint: keychain.KeyDescriptor{ + PubKey: aliceKeyPub, + }, + DelayBasePoint: keychain.KeyDescriptor{ + PubKey: aliceKeyPub, + }, + HtlcBasePoint: keychain.KeyDescriptor{ + PubKey: aliceKeyPub, + }, + } + bobCfg := channeldb.ChannelConfig{ + ChannelConstraints: channeldb.ChannelConstraints{ + DustLimit: bobDustLimit, + MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()), + ChanReserve: btcutil.Amount(rand.Int63()), + MinHTLC: lnwire.MilliSatoshi(rand.Int63()), + MaxAcceptedHtlcs: uint16(rand.Int31()), + CsvDelay: uint16(csvTimeoutBob), + }, + MultiSigKey: keychain.KeyDescriptor{ + PubKey: bobKeyPub, + }, + RevocationBasePoint: keychain.KeyDescriptor{ + PubKey: bobKeyPub, + }, + PaymentBasePoint: keychain.KeyDescriptor{ + PubKey: bobKeyPub, + }, + DelayBasePoint: keychain.KeyDescriptor{ + PubKey: bobKeyPub, + }, + HtlcBasePoint: keychain.KeyDescriptor{ + PubKey: bobKeyPub, + }, + } + + bobRoot, err := chainhash.NewHash(bobKeyPriv.Serialize()) + if err != nil { + return nil, nil, nil, err + } + bobPreimageProducer := shachain.NewRevocationProducer(*bobRoot) + bobFirstRevoke, err := bobPreimageProducer.AtIndex(0) + if err != nil { + return nil, nil, nil, err + } + bobCommitPoint := input.ComputeCommitmentPoint(bobFirstRevoke[:]) + + aliceRoot, err := chainhash.NewHash(aliceKeyPriv.Serialize()) + if err != nil { + return nil, nil, nil, err + } + alicePreimageProducer := shachain.NewRevocationProducer(*aliceRoot) + aliceFirstRevoke, err := alicePreimageProducer.AtIndex(0) + if err != nil { + return nil, nil, nil, err + } + aliceCommitPoint := input.ComputeCommitmentPoint(aliceFirstRevoke[:]) + + aliceCommitTx, bobCommitTx, err := lnwallet.CreateCommitmentTxns( + channelBal, channelBal, &aliceCfg, &bobCfg, aliceCommitPoint, + bobCommitPoint, *fundingTxIn, channeldb.SingleFunderTweaklessBit, + ) + if err != nil { + return nil, nil, nil, err + } + + alicePath, err := ioutil.TempDir("", "alicedb") + if err != nil { + return nil, nil, nil, err + } + + dbAlice, err := channeldb.Open(alicePath) + if err != nil { + return nil, nil, nil, err + } + + bobPath, err := ioutil.TempDir("", "bobdb") + if err != nil { + return nil, nil, nil, err + } + + dbBob, err := channeldb.Open(bobPath) + if err != nil { + return nil, nil, nil, err + } + + estimator := chainfee.NewStaticEstimator(12500, 0) + feePerKw, err := estimator.EstimateFeePerKW(1) + if err != nil { + return nil, nil, nil, err + } + + // TODO(roasbeef): need to factor in commit fee? + aliceCommit := channeldb.ChannelCommitment{ + CommitHeight: 0, + LocalBalance: lnwire.NewMSatFromSatoshis(channelBal), + RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal), + FeePerKw: btcutil.Amount(feePerKw), + CommitFee: feePerKw.FeeForWeight(input.CommitWeight), + CommitTx: aliceCommitTx, + CommitSig: bytes.Repeat([]byte{1}, 71), + } + bobCommit := channeldb.ChannelCommitment{ + CommitHeight: 0, + LocalBalance: lnwire.NewMSatFromSatoshis(channelBal), + RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal), + FeePerKw: btcutil.Amount(feePerKw), + CommitFee: feePerKw.FeeForWeight(input.CommitWeight), + CommitTx: bobCommitTx, + CommitSig: bytes.Repeat([]byte{1}, 71), + } + + var chanIDBytes [8]byte + if _, err := io.ReadFull(crand.Reader, chanIDBytes[:]); err != nil { + return nil, nil, nil, err + } + + shortChanID := lnwire.NewShortChanIDFromInt( + binary.BigEndian.Uint64(chanIDBytes[:]), + ) + + aliceChannelState := &channeldb.OpenChannel{ + LocalChanCfg: aliceCfg, + RemoteChanCfg: bobCfg, + IdentityPub: aliceKeyPub, + FundingOutpoint: *prevOut, + ShortChannelID: shortChanID, + ChanType: channeldb.SingleFunderTweaklessBit, + IsInitiator: true, + Capacity: channelCapacity, + RemoteCurrentRevocation: bobCommitPoint, + RevocationProducer: alicePreimageProducer, + RevocationStore: shachain.NewRevocationStore(), + LocalCommitment: aliceCommit, + RemoteCommitment: aliceCommit, + Db: dbAlice, + Packager: channeldb.NewChannelPackager(shortChanID), + FundingTxn: testTx, + } + bobChannelState := &channeldb.OpenChannel{ + LocalChanCfg: bobCfg, + RemoteChanCfg: aliceCfg, + IdentityPub: bobKeyPub, + FundingOutpoint: *prevOut, + ChanType: channeldb.SingleFunderTweaklessBit, + IsInitiator: false, + Capacity: channelCapacity, + RemoteCurrentRevocation: aliceCommitPoint, + RevocationProducer: bobPreimageProducer, + RevocationStore: shachain.NewRevocationStore(), + LocalCommitment: bobCommit, + RemoteCommitment: bobCommit, + Db: dbBob, + Packager: channeldb.NewChannelPackager(shortChanID), + } + + // Set custom values on the channel states. + updateChan(aliceChannelState, bobChannelState) + + aliceAddr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18555, + } + + if err := aliceChannelState.SyncPending(aliceAddr, 0); err != nil { + return nil, nil, nil, err + } + + bobAddr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + + if err := bobChannelState.SyncPending(bobAddr, 0); err != nil { + return nil, nil, nil, err + } + + cleanUpFunc := func() { + os.RemoveAll(bobPath) + os.RemoveAll(alicePath) + } + + aliceSigner := &mockSigner{aliceKeyPriv} + bobSigner := &mockSigner{bobKeyPriv} + + alicePool := lnwallet.NewSigPool(1, aliceSigner) + channelAlice, err := lnwallet.NewLightningChannel( + aliceSigner, aliceChannelState, alicePool, + ) + if err != nil { + return nil, nil, nil, err + } + _ = alicePool.Start() + + bobPool := lnwallet.NewSigPool(1, bobSigner) + channelBob, err := lnwallet.NewLightningChannel( + bobSigner, bobChannelState, bobPool, + ) + if err != nil { + return nil, nil, nil, err + } + _ = bobPool.Start() + + chainIO := &mockChainIO{ + bestHeight: broadcastHeight, + } + wallet := &lnwallet.LightningWallet{ + WalletController: &mockWalletController{ + rootKey: aliceKeyPriv, + publishedTxns: publTx, + }, + } + + _, currentHeight, err := chainIO.GetBestBlock() + if err != nil { + return nil, nil, nil, err + } + + htlcSwitch, err := htlcswitch.New(htlcswitch.Config{ + DB: dbAlice, + SwitchPackager: channeldb.NewSwitchPackager(), + Notifier: notifier, + FwdEventTicker: ticker.New( + htlcswitch.DefaultFwdEventInterval), + LogEventTicker: ticker.New( + htlcswitch.DefaultLogInterval), + AckEventTicker: ticker.New( + htlcswitch.DefaultAckInterval), + }, uint32(currentHeight)) + if err != nil { + return nil, nil, nil, err + } + if err = htlcSwitch.Start(); err != nil { + return nil, nil, nil, err + } + + nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner) + + const chanActiveTimeout = time.Minute + + chanStatusMgr, err := netann.NewChanStatusManager(&netann.ChanStatusConfig{ + ChanStatusSampleInterval: 30 * time.Second, + ChanEnableTimeout: chanActiveTimeout, + ChanDisableTimeout: 2 * time.Minute, + DB: dbAlice, + Graph: dbAlice.ChannelGraph(), + MessageSigner: nodeSignerAlice, + OurPubKey: aliceKeyPub, + IsChannelActive: htlcSwitch.HasActiveLink, + ApplyChannelUpdate: func(*lnwire.ChannelUpdate) error { return nil }, + }) + if err != nil { + return nil, nil, nil, err + } + if err = chanStatusMgr.Start(); err != nil { + return nil, nil, nil, err + } + + errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize) + if err != nil { + return nil, nil, nil, err + } + + var pubKey [33]byte + copy(pubKey[:], aliceKeyPub.SerializeCompressed()) + + cfgAddr := &lnwire.NetAddress{ + IdentityKey: aliceKeyPub, + Address: aliceAddr, + ChainNet: wire.SimNet, + } + + cfg := &Config{ + Addr: cfgAddr, + PubKeyBytes: pubKey, + ErrorBuffer: errBuffer, + ChainIO: chainIO, + Switch: htlcSwitch, + + ChanActiveTimeout: chanActiveTimeout, + InterceptSwitch: htlcswitch.NewInterceptableSwitch(htlcSwitch), + + ChannelDB: dbAlice, + FeeEstimator: estimator, + Wallet: wallet, + ChainNotifier: notifier, + ChanStatusMgr: chanStatusMgr, + DisconnectPeer: func(b *btcec.PublicKey) error { return nil }, + } + + alicePeer := NewBrontide(*cfg) + + chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint()) + alicePeer.activeChannels[chanID] = channelAlice + + alicePeer.wg.Add(1) + go alicePeer.channelManager() + + return alicePeer, channelBob, cleanUpFunc, nil +} diff --git a/rpcserver.go b/rpcserver.go index 3517eb22..be27c48a 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -57,6 +57,7 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/macaroons" "github.com/lightningnetwork/lnd/monitoring" + "github.com/lightningnetwork/lnd/peer" "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/record" "github.com/lightningnetwork/lnd/routing" @@ -2105,17 +2106,17 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, // With the transaction broadcast, we send our first update to // the client. updateChan = make(chan interface{}, 2) - updateChan <- &PendingUpdate{ + updateChan <- &peer.PendingUpdate{ Txid: closingTxid[:], } errChan = make(chan error, 1) notifier := r.server.cc.chainNotifier - go WaitForChanToClose(uint32(bestHeight), notifier, errChan, chanPoint, + go peer.WaitForChanToClose(uint32(bestHeight), notifier, errChan, chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, func() { // Respond to the local subsystem which // requested the channel closure. - updateChan <- &ChannelCloseUpdate{ + updateChan <- &peer.ChannelCloseUpdate{ ClosingTxid: closingTxid[:], Success: true, } @@ -2228,7 +2229,7 @@ out: // then we can break out of our dispatch loop as we no // longer need to process any further updates. switch closeUpdate := closingUpdate.(type) { - case *ChannelCloseUpdate: + case *peer.ChannelCloseUpdate: h, _ := chainhash.NewHash(closeUpdate.ClosingTxid) rpcsLog.Infof("[closechannel] close completed: "+ "txid(%v)", h) @@ -2246,7 +2247,7 @@ func createRPCCloseUpdate(update interface{}) ( *lnrpc.CloseStatusUpdate, error) { switch u := update.(type) { - case *ChannelCloseUpdate: + case *peer.ChannelCloseUpdate: return &lnrpc.CloseStatusUpdate{ Update: &lnrpc.CloseStatusUpdate_ChanClose{ ChanClose: &lnrpc.ChannelCloseUpdate{ @@ -2254,7 +2255,7 @@ func createRPCCloseUpdate(update interface{}) ( }, }, }, nil - case *PendingUpdate: + case *peer.PendingUpdate: return &lnrpc.CloseStatusUpdate{ Update: &lnrpc.CloseStatusUpdate_ClosePending{ ClosePending: &lnrpc.PendingUpdate{ @@ -2571,7 +2572,7 @@ func (r *rpcServer) ListPeers(ctx context.Context, serverPeer.RemoteFeatures(), ) - peer := &lnrpc.Peer{ + rpcPeer := &lnrpc.Peer{ PubKey: hex.EncodeToString(nodePub[:]), Address: serverPeer.Conn().RemoteAddr().String(), Inbound: serverPeer.Inbound(), @@ -2601,17 +2602,17 @@ func (r *rpcServer) ListPeers(ctx context.Context, // Add the relevant peer errors to our response. for _, error := range peerErrors { - tsError := error.(*TimestampedError) + tsError := error.(*peer.TimestampedError) rpcErr := &lnrpc.TimestampedError{ Timestamp: uint64(tsError.Timestamp.Unix()), Error: tsError.Error.Error(), } - peer.Errors = append(peer.Errors, rpcErr) + rpcPeer.Errors = append(rpcPeer.Errors, rpcErr) } - resp.Peers = append(resp.Peers, peer) + resp.Peers = append(resp.Peers, rpcPeer) } rpcsLog.Debugf("[listpeers] yielded %v peers", serverPeers) diff --git a/server.go b/server.go index 28852df2..0d113831 100644 --- a/server.go +++ b/server.go @@ -52,7 +52,7 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/nat" "github.com/lightningnetwork/lnd/netann" - ppeer "github.com/lightningnetwork/lnd/peer" + "github.com/lightningnetwork/lnd/peer" "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/pool" "github.com/lightningnetwork/lnd/queue" @@ -114,7 +114,7 @@ var ( // errPeerAlreadyConnected is an error returned by the server when we're // commanded to connect to a peer, but they're already connected. type errPeerAlreadyConnected struct { - peer *peer + peer *peer.Brontide } // Error returns the human readable version of this error type. @@ -168,10 +168,10 @@ type server struct { lastDetectedIP net.IP mu sync.RWMutex - peersByPub map[string]*peer + peersByPub map[string]*peer.Brontide - inboundPeers map[string]*peer - outboundPeers map[string]*peer + inboundPeers map[string]*peer.Brontide + outboundPeers map[string]*peer.Brontide peerConnectedListeners map[string][]chan<- lnpeer.Peer peerDisconnectedListeners map[string][]chan<- struct{} @@ -191,7 +191,7 @@ type server struct { // a disconnect. Adding a peer to this map causes the peer termination // watcher to short circuit in the event that peers are purposefully // disconnected. - ignorePeerTermination map[*peer]struct{} + ignorePeerTermination map[*peer.Brontide]struct{} // scheduledPeerConnection maps a pubkey string to a callback that // should be executed in the peerTerminationWatcher the prior peer with @@ -453,12 +453,12 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB, persistentConnReqs: make(map[string][]*connmgr.ConnReq), persistentRetryCancels: make(map[string]chan struct{}), peerErrors: make(map[string]*queue.CircularBuffer), - ignorePeerTermination: make(map[*peer]struct{}), + ignorePeerTermination: make(map[*peer.Brontide]struct{}), scheduledPeerConnection: make(map[string]func()), - peersByPub: make(map[string]*peer), - inboundPeers: make(map[string]*peer), - outboundPeers: make(map[string]*peer), + peersByPub: make(map[string]*peer.Brontide), + inboundPeers: make(map[string]*peer.Brontide), + outboundPeers: make(map[string]*peer.Brontide), peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), peerDisconnectedListeners: make(map[string][]chan<- struct{}), @@ -2309,7 +2309,7 @@ func (s *server) BroadcastMessage(skips map[route.Vertex]struct{}, // peersByPub throughout this process to ensure we deliver messages to // exact set of peers present at the time of invocation. s.mu.RLock() - peers := make([]*peer, 0, len(s.peersByPub)) + peers := make([]*peer.Brontide, 0, len(s.peersByPub)) for _, sPeer := range s.peersByPub { if skips != nil { if _, ok := skips[sPeer.PubKey()]; ok { @@ -2412,7 +2412,7 @@ func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} { // daemon's local representation of the remote peer. // // NOTE: This function is safe for concurrent access. -func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer, error) { +func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer.Brontide, error) { s.mu.RLock() defer s.mu.RUnlock() @@ -2426,7 +2426,7 @@ func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer, error) { // public key. // // NOTE: This function is safe for concurrent access. -func (s *server) FindPeerByPubStr(pubStr string) (*peer, error) { +func (s *server) FindPeerByPubStr(pubStr string) (*peer.Brontide, error) { s.mu.RLock() defer s.mu.RUnlock() @@ -2435,7 +2435,7 @@ func (s *server) FindPeerByPubStr(pubStr string) (*peer, error) { // findPeerByPubStr is an internal method that retrieves the specified peer from // the server's internal state using. -func (s *server) findPeerByPubStr(pubStr string) (*peer, error) { +func (s *server) findPeerByPubStr(pubStr string) (*peer.Brontide, error) { peer, ok := s.peersByPub[pubStr] if !ok { return nil, ErrPeerNotConnected @@ -2785,7 +2785,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, errBuffer, ok := s.peerErrors[pkStr] if !ok { var err error - errBuffer, err = queue.NewCircularBuffer(ErrorBufferSize) + errBuffer, err = queue.NewCircularBuffer(peer.ErrorBufferSize) if err != nil { srvrLog.Errorf("unable to create peer %v", err) return @@ -2798,7 +2798,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, // offered that would trigger channel closure. In case of outgoing // htlcs, an extra block is added to prevent the channel from being // closed when the htlc is outstanding and a new block comes in. - pCfg := ppeer.Config{ + pCfg := peer.Config{ Conn: conn, ConnReq: connReq, Addr: peerAddr, @@ -2853,7 +2853,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, copy(pCfg.PubKeyBytes[:], peerAddr.IdentityKey.SerializeCompressed()) copy(pCfg.ServerPubKey[:], s.identityECDH.PubKey().SerializeCompressed()) - p := newPeer(pCfg) + p := peer.NewBrontide(pCfg) // TODO(roasbeef): update IP address for link-node // * also mark last-seen, do it one single transaction? @@ -2874,7 +2874,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, // addPeer adds the passed peer to the server's global state of all active // peers. -func (s *server) addPeer(p *peer) { +func (s *server) addPeer(p *peer.Brontide) { if p == nil { return } @@ -2890,7 +2890,7 @@ func (s *server) addPeer(p *peer) { // TODO(roasbeef): pipe all requests through to the // queryHandler/peerManager - pubSer := p.NetAddress().IdentityKey.SerializeCompressed() + pubSer := p.IdentityKey().SerializeCompressed() pubStr := string(pubSer) s.peersByPub[pubStr] = p @@ -2918,7 +2918,7 @@ func (s *server) addPeer(p *peer) { // be signaled of the new peer once the method returns. // // NOTE: This MUST be launched as a goroutine. -func (s *server) peerInitializer(p *peer) { +func (s *server) peerInitializer(p *peer.Brontide) { defer s.wg.Done() // Avoid initializing peers while the server is exiting. @@ -2979,7 +2979,7 @@ func (s *server) peerInitializer(p *peer) { // successfully, otherwise the peer should be disconnected instead. // // NOTE: This MUST be launched as a goroutine. -func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { +func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) { defer s.wg.Done() p.WaitForDisconnect(ready) @@ -3149,7 +3149,7 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { // removePeer removes the passed peer from the server's state of all active // peers. -func (s *server) removePeer(p *peer) { +func (s *server) removePeer(p *peer.Brontide) { if p == nil { return } @@ -3405,8 +3405,8 @@ func (s *server) OpenChannel( // We'll wait until the peer is active before beginning the channel // opening process. select { - case <-peer.activeSignal: - case <-peer.quit: + case <-peer.ActiveSignal(): + case <-peer.QuitSignal(): req.err <- fmt.Errorf("peer %x disconnected", pubKeyBytes) return req.updates, req.err case <-s.quit: @@ -3438,11 +3438,11 @@ func (s *server) OpenChannel( // Peers returns a slice of all active peers. // // NOTE: This function is safe for concurrent access. -func (s *server) Peers() []*peer { +func (s *server) Peers() []*peer.Brontide { s.mu.RLock() defer s.mu.RUnlock() - peers := make([]*peer, 0, len(s.peersByPub)) + peers := make([]*peer.Brontide, 0, len(s.peersByPub)) for _, peer := range s.peersByPub { peers = append(peers, peer) } diff --git a/test_utils.go b/test_utils.go index c9d6f653..78c2601e 100644 --- a/test_utils.go +++ b/test_utils.go @@ -1,33 +1,8 @@ package lnd import ( - "bytes" - crand "crypto/rand" - "encoding/binary" - "io" - "io/ioutil" - "math/rand" - "net" - "os" - "time" - - "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btcutil" - "github.com/lightningnetwork/lnd/chainntnfs" - "github.com/lightningnetwork/lnd/channeldb" - "github.com/lightningnetwork/lnd/htlcswitch" - "github.com/lightningnetwork/lnd/input" - "github.com/lightningnetwork/lnd/keychain" - "github.com/lightningnetwork/lnd/lnwallet" - "github.com/lightningnetwork/lnd/lnwallet/chainfee" - "github.com/lightningnetwork/lnd/lnwire" - "github.com/lightningnetwork/lnd/netann" - ppeer "github.com/lightningnetwork/lnd/peer" - "github.com/lightningnetwork/lnd/queue" - "github.com/lightningnetwork/lnd/shachain" - "github.com/lightningnetwork/lnd/ticker" ) var ( @@ -53,9 +28,6 @@ var ( 0x6a, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53, } - // Just use some arbitrary bytes as delivery script. - dummyDeliveryScript = alicesPrivKey[:] - // testTx is used as the default funding txn for single-funder channels. testTx = &wire.MsgTx{ Version: 1, @@ -90,352 +62,3 @@ var ( LockTime: 5, } ) - -// noUpdate is a function which can be used as a parameter in createTestPeer to -// call the setup code with no custom values on the channels set up. -var noUpdate = func(a, b *channeldb.OpenChannel) {} - -// createTestPeer creates a channel between two nodes, and returns a peer for -// one of the nodes, together with the channel seen from both nodes. It takes -// an updateChan function which can be used to modify the default values on -// the channel states for each peer. -func createTestPeer(notifier chainntnfs.ChainNotifier, publTx chan *wire.MsgTx, - updateChan func(a, b *channeldb.OpenChannel)) (*peer, *lnwallet.LightningChannel, - func(), error) { - - aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes( - btcec.S256(), alicesPrivKey, - ) - aliceKeySigner := &keychain.PrivKeyDigestSigner{PrivKey: aliceKeyPriv} - bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes( - btcec.S256(), bobsPrivKey, - ) - - channelCapacity := btcutil.Amount(10 * 1e8) - channelBal := channelCapacity / 2 - aliceDustLimit := btcutil.Amount(200) - bobDustLimit := btcutil.Amount(1300) - csvTimeoutAlice := uint32(5) - csvTimeoutBob := uint32(4) - - prevOut := &wire.OutPoint{ - Hash: chainhash.Hash(testHdSeed), - Index: 0, - } - fundingTxIn := wire.NewTxIn(prevOut, nil, nil) - - aliceCfg := channeldb.ChannelConfig{ - ChannelConstraints: channeldb.ChannelConstraints{ - DustLimit: aliceDustLimit, - MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()), - ChanReserve: btcutil.Amount(rand.Int63()), - MinHTLC: lnwire.MilliSatoshi(rand.Int63()), - MaxAcceptedHtlcs: uint16(rand.Int31()), - CsvDelay: uint16(csvTimeoutAlice), - }, - MultiSigKey: keychain.KeyDescriptor{ - PubKey: aliceKeyPub, - }, - RevocationBasePoint: keychain.KeyDescriptor{ - PubKey: aliceKeyPub, - }, - PaymentBasePoint: keychain.KeyDescriptor{ - PubKey: aliceKeyPub, - }, - DelayBasePoint: keychain.KeyDescriptor{ - PubKey: aliceKeyPub, - }, - HtlcBasePoint: keychain.KeyDescriptor{ - PubKey: aliceKeyPub, - }, - } - bobCfg := channeldb.ChannelConfig{ - ChannelConstraints: channeldb.ChannelConstraints{ - DustLimit: bobDustLimit, - MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()), - ChanReserve: btcutil.Amount(rand.Int63()), - MinHTLC: lnwire.MilliSatoshi(rand.Int63()), - MaxAcceptedHtlcs: uint16(rand.Int31()), - CsvDelay: uint16(csvTimeoutBob), - }, - MultiSigKey: keychain.KeyDescriptor{ - PubKey: bobKeyPub, - }, - RevocationBasePoint: keychain.KeyDescriptor{ - PubKey: bobKeyPub, - }, - PaymentBasePoint: keychain.KeyDescriptor{ - PubKey: bobKeyPub, - }, - DelayBasePoint: keychain.KeyDescriptor{ - PubKey: bobKeyPub, - }, - HtlcBasePoint: keychain.KeyDescriptor{ - PubKey: bobKeyPub, - }, - } - - bobRoot, err := chainhash.NewHash(bobKeyPriv.Serialize()) - if err != nil { - return nil, nil, nil, err - } - bobPreimageProducer := shachain.NewRevocationProducer(*bobRoot) - bobFirstRevoke, err := bobPreimageProducer.AtIndex(0) - if err != nil { - return nil, nil, nil, err - } - bobCommitPoint := input.ComputeCommitmentPoint(bobFirstRevoke[:]) - - aliceRoot, err := chainhash.NewHash(aliceKeyPriv.Serialize()) - if err != nil { - return nil, nil, nil, err - } - alicePreimageProducer := shachain.NewRevocationProducer(*aliceRoot) - aliceFirstRevoke, err := alicePreimageProducer.AtIndex(0) - if err != nil { - return nil, nil, nil, err - } - aliceCommitPoint := input.ComputeCommitmentPoint(aliceFirstRevoke[:]) - - aliceCommitTx, bobCommitTx, err := lnwallet.CreateCommitmentTxns( - channelBal, channelBal, &aliceCfg, &bobCfg, aliceCommitPoint, - bobCommitPoint, *fundingTxIn, channeldb.SingleFunderTweaklessBit, - ) - if err != nil { - return nil, nil, nil, err - } - - alicePath, err := ioutil.TempDir("", "alicedb") - if err != nil { - return nil, nil, nil, err - } - - dbAlice, err := channeldb.Open(alicePath) - if err != nil { - return nil, nil, nil, err - } - - bobPath, err := ioutil.TempDir("", "bobdb") - if err != nil { - return nil, nil, nil, err - } - - dbBob, err := channeldb.Open(bobPath) - if err != nil { - return nil, nil, nil, err - } - - estimator := chainfee.NewStaticEstimator(12500, 0) - feePerKw, err := estimator.EstimateFeePerKW(1) - if err != nil { - return nil, nil, nil, err - } - - // TODO(roasbeef): need to factor in commit fee? - aliceCommit := channeldb.ChannelCommitment{ - CommitHeight: 0, - LocalBalance: lnwire.NewMSatFromSatoshis(channelBal), - RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal), - FeePerKw: btcutil.Amount(feePerKw), - CommitFee: feePerKw.FeeForWeight(input.CommitWeight), - CommitTx: aliceCommitTx, - CommitSig: bytes.Repeat([]byte{1}, 71), - } - bobCommit := channeldb.ChannelCommitment{ - CommitHeight: 0, - LocalBalance: lnwire.NewMSatFromSatoshis(channelBal), - RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal), - FeePerKw: btcutil.Amount(feePerKw), - CommitFee: feePerKw.FeeForWeight(input.CommitWeight), - CommitTx: bobCommitTx, - CommitSig: bytes.Repeat([]byte{1}, 71), - } - - var chanIDBytes [8]byte - if _, err := io.ReadFull(crand.Reader, chanIDBytes[:]); err != nil { - return nil, nil, nil, err - } - - shortChanID := lnwire.NewShortChanIDFromInt( - binary.BigEndian.Uint64(chanIDBytes[:]), - ) - - aliceChannelState := &channeldb.OpenChannel{ - LocalChanCfg: aliceCfg, - RemoteChanCfg: bobCfg, - IdentityPub: aliceKeyPub, - FundingOutpoint: *prevOut, - ShortChannelID: shortChanID, - ChanType: channeldb.SingleFunderTweaklessBit, - IsInitiator: true, - Capacity: channelCapacity, - RemoteCurrentRevocation: bobCommitPoint, - RevocationProducer: alicePreimageProducer, - RevocationStore: shachain.NewRevocationStore(), - LocalCommitment: aliceCommit, - RemoteCommitment: aliceCommit, - Db: dbAlice, - Packager: channeldb.NewChannelPackager(shortChanID), - FundingTxn: testTx, - } - bobChannelState := &channeldb.OpenChannel{ - LocalChanCfg: bobCfg, - RemoteChanCfg: aliceCfg, - IdentityPub: bobKeyPub, - FundingOutpoint: *prevOut, - ChanType: channeldb.SingleFunderTweaklessBit, - IsInitiator: false, - Capacity: channelCapacity, - RemoteCurrentRevocation: aliceCommitPoint, - RevocationProducer: bobPreimageProducer, - RevocationStore: shachain.NewRevocationStore(), - LocalCommitment: bobCommit, - RemoteCommitment: bobCommit, - Db: dbBob, - Packager: channeldb.NewChannelPackager(shortChanID), - } - - // Set custom values on the channel states. - updateChan(aliceChannelState, bobChannelState) - - aliceAddr := &net.TCPAddr{ - IP: net.ParseIP("127.0.0.1"), - Port: 18555, - } - - if err := aliceChannelState.SyncPending(aliceAddr, 0); err != nil { - return nil, nil, nil, err - } - - bobAddr := &net.TCPAddr{ - IP: net.ParseIP("127.0.0.1"), - Port: 18556, - } - - if err := bobChannelState.SyncPending(bobAddr, 0); err != nil { - return nil, nil, nil, err - } - - cleanUpFunc := func() { - os.RemoveAll(bobPath) - os.RemoveAll(alicePath) - } - - aliceSigner := &mockSigner{aliceKeyPriv} - bobSigner := &mockSigner{bobKeyPriv} - - alicePool := lnwallet.NewSigPool(1, aliceSigner) - channelAlice, err := lnwallet.NewLightningChannel( - aliceSigner, aliceChannelState, alicePool, - ) - if err != nil { - return nil, nil, nil, err - } - alicePool.Start() - - bobPool := lnwallet.NewSigPool(1, bobSigner) - channelBob, err := lnwallet.NewLightningChannel( - bobSigner, bobChannelState, bobPool, - ) - if err != nil { - return nil, nil, nil, err - } - bobPool.Start() - - chainIO := &mockChainIO{ - bestHeight: fundingBroadcastHeight, - } - wallet := &lnwallet.LightningWallet{ - WalletController: &mockWalletController{ - rootKey: aliceKeyPriv, - publishedTransactions: publTx, - }, - } - - _, currentHeight, err := chainIO.GetBestBlock() - if err != nil { - return nil, nil, nil, err - } - - htlcSwitch, err := htlcswitch.New(htlcswitch.Config{ - DB: dbAlice, - SwitchPackager: channeldb.NewSwitchPackager(), - Notifier: notifier, - FwdEventTicker: ticker.New( - htlcswitch.DefaultFwdEventInterval), - LogEventTicker: ticker.New( - htlcswitch.DefaultLogInterval), - AckEventTicker: ticker.New( - htlcswitch.DefaultAckInterval), - }, uint32(currentHeight)) - if err != nil { - return nil, nil, nil, err - } - if err = htlcSwitch.Start(); err != nil { - return nil, nil, nil, err - } - - nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner) - - const chanActiveTimeout = time.Minute - - chanStatusMgr, err := netann.NewChanStatusManager(&netann.ChanStatusConfig{ - ChanStatusSampleInterval: 30 * time.Second, - ChanEnableTimeout: chanActiveTimeout, - ChanDisableTimeout: 2 * time.Minute, - DB: dbAlice, - Graph: dbAlice.ChannelGraph(), - MessageSigner: nodeSignerAlice, - OurPubKey: aliceKeyPub, - IsChannelActive: htlcSwitch.HasActiveLink, - ApplyChannelUpdate: func(*lnwire.ChannelUpdate) error { return nil }, - }) - if err != nil { - return nil, nil, nil, err - } - if err = chanStatusMgr.Start(); err != nil { - return nil, nil, nil, err - } - - errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize) - if err != nil { - return nil, nil, nil, err - } - - var pubKey [33]byte - copy(pubKey[:], aliceKeyPub.SerializeCompressed()) - - cfgAddr := &lnwire.NetAddress{ - IdentityKey: aliceKeyPub, - Address: aliceAddr, - ChainNet: wire.SimNet, - } - - pCfg := ppeer.Config{ - Addr: cfgAddr, - PubKeyBytes: pubKey, - ErrorBuffer: errBuffer, - ChainIO: chainIO, - Switch: htlcSwitch, - - ChanActiveTimeout: chanActiveTimeout, - InterceptSwitch: htlcswitch.NewInterceptableSwitch(htlcSwitch), - - ChannelDB: dbAlice, - FeeEstimator: estimator, - Wallet: wallet, - ChainNotifier: notifier, - ChanStatusMgr: chanStatusMgr, - DisconnectPeer: func(b *btcec.PublicKey) error { return nil }, - } - - alicePeer := newPeer(pCfg) - - chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint()) - alicePeer.activeChannels[chanID] = channelAlice - - alicePeer.wg.Add(1) - go alicePeer.channelManager() - - return alicePeer, channelBob, cleanUpFunc, nil -}