lnd.xprv/watchtower/wtserver/server.go

686 lines
19 KiB
Go
Raw Normal View History

package wtserver
import (
"bytes"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/connmgr"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/watchtower/blob"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
"github.com/lightningnetwork/lnd/watchtower/wtpolicy"
"github.com/lightningnetwork/lnd/watchtower/wtwire"
)
var (
// ErrPeerAlreadyConnected signals that a peer with the same session id
// is already active within the server.
ErrPeerAlreadyConnected = errors.New("peer already connected")
)
// Config abstracts the primary components and dependencies of the server.
type Config struct {
// DB provides persistent access to the server's sessions and for
// storing state updates.
DB DB
// NodePrivKey is private key to be used in accepting new brontide
// connections.
NodePrivKey *btcec.PrivateKey
// Listeners specifies which address to which clients may connect.
Listeners []net.Listener
// ReadTimeout specifies how long a client may go without sending a
// message.
ReadTimeout time.Duration
// WriteTimeout specifies how long a client may go without reading a
// message from the other end, if the connection has stopped buffering
// the server's replies.
WriteTimeout time.Duration
// NewAddress is used to generate reward addresses, where a cut of
// successfully sent funds can be received.
NewAddress func() (btcutil.Address, error)
// ChainHash identifies the network that the server is watching.
ChainHash chainhash.Hash
// NoAckUpdates causes the server to not acknowledge state updates, this
// should only be used for testing.
NoAckUpdates bool
}
// Server houses the state required to handle watchtower peers. It's primary job
// is to accept incoming connections, and dispatch processing of the client
// message streams.
type Server struct {
started sync.Once
stopped sync.Once
cfg *Config
connMgr *connmgr.ConnManager
clientMtx sync.RWMutex
clients map[wtdb.SessionID]Peer
newPeers chan Peer
localInit *wtwire.Init
wg sync.WaitGroup
quit chan struct{}
}
// New creates a new server to handle watchtower clients. The server will accept
// clients connecting to the listener addresses, and allows them to open
// sessions and send state updates.
func New(cfg *Config) (*Server, error) {
localInit := wtwire.NewInitMessage(
lnwire.NewRawFeatureVector(wtwire.WtSessionsOptional),
cfg.ChainHash,
)
s := &Server{
cfg: cfg,
clients: make(map[wtdb.SessionID]Peer),
newPeers: make(chan Peer),
localInit: localInit,
quit: make(chan struct{}),
}
connMgr, err := connmgr.New(&connmgr.Config{
Listeners: cfg.Listeners,
OnAccept: s.inboundPeerConnected,
Dial: noDial,
})
if err != nil {
return nil, err
}
s.connMgr = connMgr
return s, nil
}
// Start begins listening on the server's listeners.
func (s *Server) Start() error {
s.started.Do(func() {
log.Infof("Starting watchtower server")
s.wg.Add(1)
go s.peerHandler()
s.connMgr.Start()
log.Infof("Watchtower server started successfully")
})
return nil
}
// Stop shutdowns down the server's listeners and any active requests.
func (s *Server) Stop() error {
s.stopped.Do(func() {
log.Infof("Stopping watchtower server")
s.connMgr.Stop()
close(s.quit)
s.wg.Wait()
log.Infof("Watchtower server stopped successfully")
})
return nil
}
// inboundPeerConnected is the callback given to the connection manager, and is
// called each time a new connection is made to the watchtower. This method
// proxies the new peers by filtering out those that do not satisfy the
// server.Peer interface, and closes their connection. Successful connections
// will be passed on to the public InboundPeerConnected method.
func (s *Server) inboundPeerConnected(c net.Conn) {
peer, ok := c.(Peer)
if !ok {
log.Warnf("incoming connection %T does not satisfy "+
"server.Peer interface", c)
c.Close()
return
}
s.InboundPeerConnected(peer)
}
// InboundPeerConnected accepts a server.Peer, and handles the request submitted
// by the client. This method serves also as a public endpoint for locally
// registering new clients with the server.
func (s *Server) InboundPeerConnected(peer Peer) {
select {
case s.newPeers <- peer:
case <-s.quit:
}
}
// peerHandler processes newly accepted peers and spawns a client handler for
// each. The peerHandler is used to ensure that waitgrouped client handlers are
// spawned from a waitgrouped goroutine.
func (s *Server) peerHandler() {
defer s.wg.Done()
defer s.removeAllPeers()
for {
select {
case peer := <-s.newPeers:
s.wg.Add(1)
go s.handleClient(peer)
case <-s.quit:
return
}
}
}
// handleClient processes a series watchtower messages sent by a client. The
// client may either send:
// * a single CreateSession message.
// * a series of StateUpdate messages.
//
// This method uses the server's peer map to ensure at most one peer using the
// same session id can enter the main event loop. The connection will be
// dropped by the watchtower if no messages are sent or received by the
// configured Read/WriteTimeouts.
//
// NOTE: This method MUST be run as a goroutine.
func (s *Server) handleClient(peer Peer) {
defer s.wg.Done()
// Use the connection's remote pubkey as the client's session id.
id := wtdb.NewSessionIDFromPubKey(peer.RemotePub())
// Register this peer in the server's client map, and defer the
// connection's cleanup. If the peer already exists, we will close the
// connection and exit immediately.
err := s.addPeer(&id, peer)
if err != nil {
peer.Close()
return
}
defer s.removePeer(&id, peer.RemoteAddr())
msg, err := s.readMessage(peer)
if err != nil {
log.Errorf("Unable to read message from client %s@%s: %v",
id, peer.RemoteAddr(), err)
return
}
remoteInit, ok := msg.(*wtwire.Init)
if !ok {
log.Errorf("Client %s@%s did not send Init msg as first "+
"message", id, peer.RemoteAddr())
return
}
err = s.sendMessage(peer, s.localInit)
if err != nil {
log.Errorf("Unable to send Init msg to %s: %v", id, err)
return
}
err = s.localInit.CheckRemoteInit(remoteInit, wtwire.FeatureNames)
if err != nil {
log.Errorf("Cannot support client %s: %v", id, err)
return
}
// stateUpdateOnlyMode will become true if the client's first message is
// a StateUpdate. If instead, it is a CreateSession, this method will exit
// immediately after replying. We track this to ensure that the client
// can't send a CreateSession after having already sent a StateUpdate.
var stateUpdateOnlyMode bool
for {
select {
case <-s.quit:
return
default:
}
nextMsg, err := s.readMessage(peer)
if err != nil {
log.Errorf("Unable to read watchtower msg from %x: %v",
id[:], err)
return
}
// Process the request according to the message's type.
switch msg := nextMsg.(type) {
// A CreateSession indicates a request to establish a new session
// with our watchtower.
case *wtwire.CreateSession:
// Ensure CreateSession can only be sent as the first
// message.
if stateUpdateOnlyMode {
log.Errorf("client %x sent CreateSession after "+
"StateUpdate", id)
return
}
// Attempt to open a new session for this client.
err := s.handleCreateSession(peer, &id, msg)
if err != nil {
log.Errorf("Unable to handle CreateSession "+
"from %s: %v", id, err)
}
// Exit after replying to CreateSession.
return
// A StateUpdate indicates an existing client attempting to
// back-up a revoked commitment state.
case *wtwire.StateUpdate:
// Try to accept the state update from the client.
err := s.handleStateUpdate(peer, &id, msg)
if err != nil {
log.Errorf("unable to handle StateUpdate "+
"from %s: %v", id, err)
return
}
// If the client signals that this is last StateUpdate
// message, we can disconnect the client.
if msg.IsComplete == 1 {
return
}
// The client has signaled that more StateUpdates are
// yet to come. Enter state-update-only mode to disallow
// future sends of CreateSession messages.
stateUpdateOnlyMode = true
default:
log.Errorf("received unsupported message type: %T "+
"from %s", nextMsg, id)
return
}
}
}
// handleCreateSession processes a CreateSession message from the peer, and returns
// a CreateSessionReply in response. This method will only succeed if no existing
// session info is known about the session id. If an existing session is found,
// the reward address is returned in case the client lost our reply.
func (s *Server) handleCreateSession(peer Peer, id *wtdb.SessionID,
req *wtwire.CreateSession) error {
// TODO(conner): validate accept against policy
// Query the db for session info belonging to the client's session id.
existingInfo, err := s.cfg.DB.GetSessionInfo(id)
switch {
// We already have a session corresponding to this session id, return an
// error signaling that it already exists in our database. We return the
// reward address to the client in case they were not able to process
// our reply earlier.
case err == nil:
log.Debugf("Already have session for %s", id)
return s.replyCreateSession(
peer, id, wtwire.CreateSessionCodeAlreadyExists,
existingInfo.RewardAddress,
)
// Some other database error occurred, return a temporary failure.
case err != wtdb.ErrSessionNotFound:
log.Errorf("unable to load session info for %s", id)
return s.replyCreateSession(
peer, id, wtwire.CodeTemporaryFailure, nil,
)
}
// Now that we've established that this session does not exist in the
// database, retrieve the sweep address that will be given to the
// client. This address is to be included by the client when signing
// sweep transactions destined for this tower, if its negotiated output
// is not dust.
rewardAddress, err := s.cfg.NewAddress()
if err != nil {
log.Errorf("unable to generate reward addr for %s", id)
return s.replyCreateSession(
peer, id, wtwire.CodeTemporaryFailure, nil,
)
}
// Construct the pkscript the client should pay to when signing justice
// transactions for this session.
rewardScript, err := txscript.PayToAddrScript(rewardAddress)
if err != nil {
log.Errorf("unable to generate reward script for %s", id)
return s.replyCreateSession(
peer, id, wtwire.CodeTemporaryFailure, nil,
)
}
// Ensure that the requested blob type is supported by our tower.
if !blob.IsSupportedType(req.BlobType) {
log.Debugf("Rejecting CreateSession from %s, unsupported blob "+
"type %s", id, req.BlobType)
return s.replyCreateSession(
peer, id, wtwire.CreateSessionCodeRejectBlobType, nil,
)
}
// TODO(conner): create invoice for upfront payment
// Assemble the session info using the agreed upon parameters, reward
// address, and session id.
info := wtdb.SessionInfo{
ID: *id,
Policy: wtpolicy.Policy{
BlobType: req.BlobType,
MaxUpdates: req.MaxUpdates,
RewardBase: req.RewardBase,
RewardRate: req.RewardRate,
SweepFeeRate: req.SweepFeeRate,
},
RewardAddress: rewardScript,
}
// Insert the session info into the watchtower's database. If
// successful, the session will now be ready for use.
err = s.cfg.DB.InsertSessionInfo(&info)
if err != nil {
log.Errorf("unable to create session for %s", id)
return s.replyCreateSession(
peer, id, wtwire.CodeTemporaryFailure, nil,
)
}
log.Infof("Accepted session for %s", id)
return s.replyCreateSession(
peer, id, wtwire.CodeOK, rewardScript,
)
}
// handleStateUpdate processes a StateUpdate message request from a client. An
// attempt will be made to insert the update into the db, where it is validated
// against the client's session. The possible errors are then mapped back to
// StateUpdateCodes specified by the watchtower wire protocol, and sent back
// using a StateUpdateReply message.
func (s *Server) handleStateUpdate(peer Peer, id *wtdb.SessionID,
update *wtwire.StateUpdate) error {
var (
lastApplied uint16
failCode wtwire.ErrorCode
err error
)
sessionUpdate := wtdb.SessionStateUpdate{
ID: *id,
Hint: update.Hint,
SeqNum: update.SeqNum,
LastApplied: update.LastApplied,
EncryptedBlob: update.EncryptedBlob,
}
lastApplied, err = s.cfg.DB.InsertStateUpdate(&sessionUpdate)
switch {
case err == nil:
log.Debugf("State update %d accepted for %s",
update.SeqNum, id)
failCode = wtwire.CodeOK
// Return a permanent failure if a client tries to send an update for
// which we have no session.
case err == wtdb.ErrSessionNotFound:
failCode = wtwire.CodePermanentFailure
case err == wtdb.ErrSeqNumAlreadyApplied:
failCode = wtwire.CodePermanentFailure
// TODO(conner): remove session state for protocol
// violation. Could also double as clean up method for
// session-related state.
case err == wtdb.ErrLastAppliedReversion:
failCode = wtwire.StateUpdateCodeClientBehind
case err == wtdb.ErrSessionConsumed:
failCode = wtwire.StateUpdateCodeMaxUpdatesExceeded
case err == wtdb.ErrUpdateOutOfOrder:
failCode = wtwire.StateUpdateCodeSeqNumOutOfOrder
default:
failCode = wtwire.CodeTemporaryFailure
}
if s.cfg.NoAckUpdates {
return &connFailure{
ID: *id,
Code: uint16(failCode),
}
}
return s.replyStateUpdate(
peer, id, failCode, lastApplied,
)
}
// connFailure is a default error used when a request failed with a non-zero
// error code.
type connFailure struct {
ID wtdb.SessionID
Code uint16
}
// Error displays the SessionID and Code that caused the connection failure.
func (f *connFailure) Error() string {
return fmt.Sprintf("connection with %s failed with code=%v",
f.ID, f.Code,
)
}
// replyCreateSession sends a response to a CreateSession from a client. If the
// status code in the reply is OK, the error from the write will be bubbled up.
// Otherwise, this method returns a connection error to ensure we don't continue
// communication with the client.
func (s *Server) replyCreateSession(peer Peer, id *wtdb.SessionID,
code wtwire.ErrorCode, data []byte) error {
msg := &wtwire.CreateSessionReply{
Code: code,
Data: data,
}
err := s.sendMessage(peer, msg)
if err != nil {
log.Errorf("unable to send CreateSessionReply to %s", id)
}
// Return the write error if the request succeeded.
if code == wtwire.CodeOK {
return err
}
// Otherwise the request failed, return a connection failure to
// disconnect the client.
return &connFailure{
ID: *id,
Code: uint16(code),
}
}
// replyStateUpdate sends a response to a StateUpdate from a client. If the
// status code in the reply is OK, the error from the write will be bubbled up.
// Otherwise, this method returns a connection error to ensure we don't continue
// communication with the client.
func (s *Server) replyStateUpdate(peer Peer, id *wtdb.SessionID,
code wtwire.StateUpdateCode, lastApplied uint16) error {
msg := &wtwire.StateUpdateReply{
Code: code,
LastApplied: lastApplied,
}
err := s.sendMessage(peer, msg)
if err != nil {
log.Errorf("unable to send StateUpdateReply to %s", id)
}
// Return the write error if the request succeeded.
if code == wtwire.CodeOK {
return err
}
// Otherwise the request failed, return a connection failure to
// disconnect the client.
return &connFailure{
ID: *id,
Code: uint16(code),
}
}
// readMessage receives and parses the next message from the given Peer. An
// error is returned if a message is not received before the server's read
// timeout, the read off the wire failed, or the message could not be
// deserialized.
func (s *Server) readMessage(peer Peer) (wtwire.Message, error) {
// Set a read timeout to ensure we drop the client if not sent in a
// timely manner.
err := peer.SetReadDeadline(time.Now().Add(s.cfg.ReadTimeout))
if err != nil {
err = fmt.Errorf("unable to set read deadline: %v", err)
return nil, err
}
// Pull the next message off the wire, and parse it according to the
// watchtower wire specification.
rawMsg, err := peer.ReadNextMessage()
if err != nil {
err = fmt.Errorf("unable to read message: %v", err)
return nil, err
}
msgReader := bytes.NewReader(rawMsg)
msg, err := wtwire.ReadMessage(msgReader, 0)
if err != nil {
err = fmt.Errorf("unable to parse message: %v", err)
return nil, err
}
logMessage(peer, msg, true)
return msg, nil
}
// sendMessage sends a watchtower wire message to the target peer.
func (s *Server) sendMessage(peer Peer, msg wtwire.Message) error {
// TODO(conner): use buffer pool?
var b bytes.Buffer
_, err := wtwire.WriteMessage(&b, msg, 0)
if err != nil {
err = fmt.Errorf("unable to encode msg: %v", err)
return err
}
err = peer.SetWriteDeadline(time.Now().Add(s.cfg.WriteTimeout))
if err != nil {
err = fmt.Errorf("unable to set write deadline: %v", err)
return err
}
logMessage(peer, msg, false)
_, err = peer.Write(b.Bytes())
return err
}
// addPeer stores a client in the server's client map. An error is returned if a
// client with the same session id already exists.
func (s *Server) addPeer(id *wtdb.SessionID, peer Peer) error {
s.clientMtx.Lock()
defer s.clientMtx.Unlock()
if existingPeer, ok := s.clients[*id]; ok {
log.Infof("Already connected to peer %s@%s, disconnecting %s",
id, existingPeer.RemoteAddr(), peer.RemoteAddr())
return ErrPeerAlreadyConnected
}
s.clients[*id] = peer
log.Infof("Accepted incoming peer %s@%s",
id, peer.RemoteAddr())
return nil
}
// removePeer deletes a client from the server's client map. If a peer is found,
// this method will close the peer's connection.
func (s *Server) removePeer(id *wtdb.SessionID, addr net.Addr) {
log.Infof("Releasing incoming peer %s@%s", id, addr)
s.clientMtx.Lock()
peer, ok := s.clients[*id]
delete(s.clients, *id)
s.clientMtx.Unlock()
if ok {
peer.Close()
}
}
// removeAllPeers iterates through the server's current set of peers and closes
// all open connections.
func (s *Server) removeAllPeers() {
s.clientMtx.Lock()
defer s.clientMtx.Unlock()
for id, peer := range s.clients {
log.Infof("Releasing incoming peer %s@%s", id,
peer.RemoteAddr())
delete(s.clients, id)
peer.Close()
}
}
// logMessage writes information about a message exchanged with a remote peer,
// using directional prepositions to signal whether the message was sent or
// received.
func logMessage(peer Peer, msg wtwire.Message, read bool) {
var action = "Received"
var preposition = "from"
if !read {
action = "Sending"
preposition = "to"
}
summary := wtwire.MessageSummary(msg)
if len(summary) > 0 {
summary = "(" + summary + ")"
}
log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary,
preposition, peer.RemotePub().SerializeCompressed(),
peer.RemoteAddr())
}
// noDial is a dummy dial method passed to the server's connmgr.
func noDial(_ net.Addr) (net.Conn, error) {
return nil, fmt.Errorf("watchtower cannot make outgoing conns")
}