server+peer: add 'init' message support

In this commit the support for global and local feature vectors were
added in 'server' and 'peer' structures respectively. Also with commit
additional logic was added and now node waits to receive 'init'
lnwire.Message before sending/responding on any other messages.
This commit is contained in:
Andrey Samokhvalov 2017-02-16 15:39:38 +03:00 committed by Olaoluwa Osuntokun
parent bff55cb705
commit 6ce9ea29da
2 changed files with 93 additions and 5 deletions

81
peer.go

@ -5,7 +5,6 @@ import (
"container/list" "container/list"
"crypto/rand" "crypto/rand"
"encoding/binary" "encoding/binary"
"errors"
"fmt" "fmt"
"net" "net"
"sync" "sync"
@ -25,6 +24,7 @@ import (
"github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil" "github.com/roasbeef/btcutil"
"github.com/go-errors/errors"
) )
var ( var (
@ -151,6 +151,16 @@ type peer struct {
server *server server *server
// localSharedFeatures is a product of comparison of our and their
// local features vectors which consist of features which are present
// on both sides.
localSharedFeatures *lnwire.SharedFeatures
// globalSharedFeatures is a product of comparison of our and their
// global features vectors which consist of features which are present
// on both sides.
globalSharedFeatures *lnwire.SharedFeatures
queueQuit chan struct{} queueQuit chan struct{}
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -189,6 +199,9 @@ func newPeer(conn net.Conn, server *server, addr *lnwire.NetAddress,
localCloseChanReqs: make(chan *closeLinkReq), localCloseChanReqs: make(chan *closeLinkReq),
remoteCloseChanReqs: make(chan *lnwire.CloseRequest), remoteCloseChanReqs: make(chan *lnwire.CloseRequest),
localSharedFeatures: lnwire.NewSharedFeatures(localFeaturesMap),
globalSharedFeatures: lnwire.NewSharedFeatures(globalFeaturesMap),
queueQuit: make(chan struct{}), queueQuit: make(chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -263,7 +276,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
// Start starts all helper goroutines the peer needs for normal operations. // Start starts all helper goroutines the peer needs for normal operations.
// In the case this peer has already been started, then this function is a // In the case this peer has already been started, then this function is a
// noop. // loop.
func (p *peer) Start() error { func (p *peer) Start() error {
if atomic.AddInt32(&p.started, 1) != 1 { if atomic.AddInt32(&p.started, 1) != 1 {
return nil return nil
@ -271,10 +284,34 @@ func (p *peer) Start() error {
peerLog.Tracef("peer %v starting", p) peerLog.Tracef("peer %v starting", p)
p.wg.Add(5) p.wg.Add(2)
go p.readHandler()
go p.queueHandler() go p.queueHandler()
go p.writeHandler() go p.writeHandler()
// Exchange local and global features, the init message should be
// very first between two nodes.
if err := p.sendInitMsg(); err != nil {
return err
}
// Should wait for peers to compare their feature vectors
// and only then start message exchanges.
msg, _, err := p.readNextMessage()
if err != nil {
return err
}
if msg, ok := msg.(*lnwire.Init); ok {
if err := p.handleInitMsg(msg); err != nil {
return err
}
} else {
return errors.New("very first message between nodes " +
"must be init message")
}
p.wg.Add(3)
go p.readHandler()
go p.channelManager() go p.channelManager()
go p.pingHandler() go p.pingHandler()
@ -1205,6 +1242,42 @@ out:
peerLog.Tracef("htlcManager for peer %v done", p) peerLog.Tracef("htlcManager for peer %v done", p)
} }
// handleInitMsg handles the incoming init message which contains global and
// local features vectors. If feature vectors are incompatible then disconnect.
func (p *peer) handleInitMsg(msg *lnwire.Init) error {
localSharedFeatures, err := p.server.localFeatures.Compare(msg.LocalFeatures)
if err != nil {
err := errors.Errorf("can compare remote and local feature " +
"vectors: %v", err)
peerLog.Error(err)
return err
}
p.localSharedFeatures = localSharedFeatures
globalSharedFeatures, err := p.server.globalFeatures.Compare(msg.GlobalFeatures)
if err != nil {
err := errors.Errorf("can compare remote and global feature " +
"vectors: %v", err)
peerLog.Error(err)
return err
}
p.globalSharedFeatures = globalSharedFeatures
return nil
}
// sendInitMsg sends init message to remote peer which represent our
// features local and global vectors.
func (p *peer) sendInitMsg() error {
msg := lnwire.NewInitMessage(
p.server.globalFeatures,
p.server.localFeatures,
)
p.queueMsg(msg, nil)
return nil
}
// handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC // handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC
// Switch. Possible messages sent by the switch include requests to forward new // Switch. Possible messages sent by the switch include requests to forward new
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently // HTLCs, timeout previously cleared HTLCs, and finally to settle currently

@ -76,6 +76,14 @@ type server struct {
donePeers chan *peer donePeers chan *peer
queries chan interface{} queries chan interface{}
// globalFeatures feature vector which affects HTLCs and thus are also
// advertised to other nodes.
globalFeatures *lnwire.FeatureVector
// localFeatures is an feature vector which represent the features which
// only affect the protocol between these two nodes.
localFeatures *lnwire.FeatureVector
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
} }
@ -129,6 +137,9 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
broadcastRequests: make(chan *broadcastReq), broadcastRequests: make(chan *broadcastReq),
sendRequests: make(chan *sendReq), sendRequests: make(chan *sendReq),
globalFeatures: lnwire.NewFeatureVector(globalFeaturesMap),
localFeatures: lnwire.NewFeatureVector(localFeaturesMap),
queries: make(chan interface{}), queries: make(chan interface{}),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -413,7 +424,11 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound
// TODO(roasbeef): update IP address for link-node // TODO(roasbeef): update IP address for link-node
// * also mark last-seen, do it one single transaction? // * also mark last-seen, do it one single transaction?
peer.Start() if err := peer.Start(); err != nil {
conn.Close()
return
}
s.newPeers <- peer s.newPeers <- peer
} }