fcff17c336
This commit will allow the general public to build lnd without jumping through hoops setting up their local git branches nicely with all of our forks.
267 lines
5.2 KiB
Go
267 lines
5.2 KiB
Go
package main
|
|
|
|
import (
|
|
"container/list"
|
|
"net"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/lightningnetwork/lnd/lndc"
|
|
"github.com/lightningnetwork/lnd/lnwallet"
|
|
"github.com/lightningnetwork/lnd/lnwire"
|
|
"github.com/roasbeef/btcd/wire"
|
|
)
|
|
|
|
var (
|
|
numNodes int32
|
|
)
|
|
|
|
// channelState...
|
|
type channelState uint8
|
|
|
|
const (
|
|
// TODO(roasbeef): others??
|
|
channelPending channelState = iota
|
|
channelOpen
|
|
channelClosed
|
|
channelDispute
|
|
channelPendingPayment
|
|
)
|
|
|
|
const (
|
|
numAllowedRetransmits = 5
|
|
pingInterval = 1 * time.Minute
|
|
|
|
outgoingQueueLen = 50
|
|
)
|
|
|
|
// outgoinMsg...
|
|
type outgoinMsg struct {
|
|
msg lnwire.Message
|
|
sentChan chan struct{}
|
|
}
|
|
|
|
// peer...
|
|
// inspired by btcd/peer.go
|
|
type peer struct {
|
|
// only to be used atomically
|
|
started int32
|
|
connected int32
|
|
disconnect int32
|
|
|
|
conn net.Conn
|
|
|
|
lightningAddr lndc.LNAdr
|
|
inbound bool
|
|
protocolVersion uint32
|
|
peerId int32
|
|
|
|
// For purposes of detecting retransmits, etc.
|
|
lastNMessages map[lnwire.Message]struct{}
|
|
|
|
sync.RWMutex
|
|
timeConnected time.Time
|
|
lastSend time.Time
|
|
lastRecv time.Time
|
|
bytesReceived uint64
|
|
bytesSent uint64
|
|
satoshisSent uint64
|
|
satoshisReceived uint64
|
|
// TODO(roasbeef): pings??
|
|
|
|
sendQueueSync chan struct{}
|
|
outgoingQueue chan outgoinMsg
|
|
sendQueue chan outgoinMsg
|
|
|
|
// Only will be set if the channel is in the 'pending' state.
|
|
reservation *lnwallet.ChannelReservation
|
|
|
|
lnChannel *lnwallet.LightningChannel
|
|
|
|
queueQuit chan struct{}
|
|
quit chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// newPeer...
|
|
func newPeer(conn net.Conn, server *server) *peer {
|
|
return &peer{
|
|
conn: conn,
|
|
peerId: atomic.AddInt32(&numNodes, 1),
|
|
|
|
lastNMessages: make(map[lnwire.Message]struct{}),
|
|
|
|
sendQueueSync: make(chan struct{}, 1),
|
|
sendQueue: make(chan outgoinMsg, 1),
|
|
outgoingQueue: make(chan outgoinMsg, outgoingQueueLen),
|
|
|
|
queueQuit: make(chan struct{}),
|
|
quit: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (p *peer) Start() error {
|
|
if atomic.AddInt32(&p.started, 1) != 1 {
|
|
return nil
|
|
}
|
|
|
|
// TODO(roasbeef): version handshake
|
|
|
|
p.wg.Add(3)
|
|
go p.inHandler()
|
|
go p.queueHandler()
|
|
go p.outHandler()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *peer) Stop() error {
|
|
// If we're already disconnecting, just exit.
|
|
if atomic.AddInt32(&p.disconnect, 1) != 1 {
|
|
return nil
|
|
}
|
|
|
|
// Otherwise, close the connection if we're currently connected.
|
|
if atomic.LoadInt32(&p.connected) != 0 {
|
|
p.conn.Close()
|
|
}
|
|
|
|
// Signal all worker goroutines to gracefully exit.
|
|
close(p.quit)
|
|
|
|
return nil
|
|
}
|
|
|
|
// readNextMessage...
|
|
func (p *peer) readNextMessage() (lnwire.Message, []byte, error) {
|
|
// TODO(roasbeef): use our own net magic?
|
|
_, nextMsg, rawPayload, err := lnwire.ReadMessage(p.conn, 0, wire.TestNet)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return nextMsg, rawPayload, nil
|
|
}
|
|
|
|
// inHandler..
|
|
func (p *peer) inHandler() {
|
|
// TODO(roasbeef): set timeout for initial channel request or version
|
|
// exchange.
|
|
|
|
out:
|
|
for atomic.LoadInt32(&p.disconnect) == 0 {
|
|
nextMsg, _, err := p.readNextMessage()
|
|
if err != nil {
|
|
// TODO(roasbeef): log error
|
|
break out
|
|
}
|
|
|
|
// TODO(roasbeef): state-machine to track version exchange
|
|
switch msg := nextMsg.(type) {
|
|
// TODO(roasbeef): cases
|
|
}
|
|
}
|
|
|
|
p.wg.Done()
|
|
}
|
|
|
|
// writeMessage...
|
|
func (p *peer) writeMessage(msg lnwire.Message) error {
|
|
// Simply exit if we're shutting down.
|
|
if atomic.LoadInt32(&p.disconnect) != 0 {
|
|
return nil
|
|
}
|
|
|
|
_, err := lnwire.WriteMessage(p.conn, msg, 0,
|
|
wire.TestNet)
|
|
|
|
return err
|
|
}
|
|
|
|
// outHandler..
|
|
func (p *peer) outHandler() {
|
|
// pingTicker is used to periodically send pings to the remote peer.
|
|
pingTicker := time.NewTicker(pingInterval)
|
|
defer pingTicker.Stop()
|
|
|
|
out:
|
|
for {
|
|
select {
|
|
case outMsg := <-p.sendQueue:
|
|
switch m := outMsg.msg.(type) {
|
|
// TODO(roasbeef): handle special write cases
|
|
}
|
|
|
|
if err := p.writeMessage(outMsg.msg); err != nil {
|
|
// TODO(roasbeef): disconnect
|
|
}
|
|
|
|
// Synchronize with the outHandler.
|
|
p.sendQueueSync <- struct{}{}
|
|
case <-pingTicker.C:
|
|
// TODO(roasbeef): ping em
|
|
case <-p.quit:
|
|
break out
|
|
|
|
}
|
|
}
|
|
|
|
// Wait for the queueHandler to finish so we can empty out all pending
|
|
// messages avoiding a possible deadlock somewhere.
|
|
<-p.queueQuit
|
|
|
|
// Drain any lingering messages that we're meant to be sent. But since
|
|
// we're shutting down, just ignore them.
|
|
fin:
|
|
for {
|
|
select {
|
|
case msg := <-p.sendQueue:
|
|
if msg.sentChan != nil {
|
|
msg.sentChan <- struct{}{}
|
|
}
|
|
default:
|
|
break fin
|
|
}
|
|
}
|
|
p.wg.Done()
|
|
}
|
|
|
|
// queueHandler..
|
|
func (p *peer) queueHandler() {
|
|
waitOnSync := false
|
|
pendingMsgs := list.New()
|
|
out:
|
|
for {
|
|
select {
|
|
case msg := <-p.outgoingQueue:
|
|
if !waitOnSync {
|
|
p.sendQueue <- msg
|
|
} else {
|
|
pendingMsgs.PushBack(msg)
|
|
}
|
|
waitOnSync = true
|
|
case <-p.sendQueueSync:
|
|
// If there aren't any more remaining messages in the
|
|
// queue, then we're no longer waiting to synchronize
|
|
// with the outHandler.
|
|
next := pendingMsgs.Front()
|
|
if next == nil {
|
|
waitOnSync = false
|
|
continue
|
|
}
|
|
|
|
// Notify the outHandler about the next item to
|
|
// asynchronously send.
|
|
val := pendingMsgs.Remove(next)
|
|
p.sendQueue <- val.(outgoinMsg)
|
|
// TODO(roasbeef): other sync stuffs
|
|
case <-p.quit:
|
|
break out
|
|
}
|
|
}
|
|
|
|
close(p.queueQuit)
|
|
p.wg.Done()
|
|
}
|