From 25b2a352cbd0f5f0e39359baee4cad2e935c1217 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 19 Mar 2019 19:38:20 -0700 Subject: [PATCH] watchtower/wtserver: refactor server handlers into own files --- watchtower/wtserver/create_session.go | 136 +++++++++++ watchtower/wtserver/server.go | 313 ++------------------------ watchtower/wtserver/state_update.go | 157 +++++++++++++ 3 files changed, 316 insertions(+), 290 deletions(-) create mode 100644 watchtower/wtserver/create_session.go create mode 100644 watchtower/wtserver/state_update.go diff --git a/watchtower/wtserver/create_session.go b/watchtower/wtserver/create_session.go new file mode 100644 index 00000000..e948d8b9 --- /dev/null +++ b/watchtower/wtserver/create_session.go @@ -0,0 +1,136 @@ +package wtserver + +import ( + "github.com/btcsuite/btcd/txscript" + "github.com/lightningnetwork/lnd/watchtower/blob" + "github.com/lightningnetwork/lnd/watchtower/wtdb" + "github.com/lightningnetwork/lnd/watchtower/wtpolicy" + "github.com/lightningnetwork/lnd/watchtower/wtwire" +) + +// handleCreateSession processes a CreateSession message from the peer, and returns +// a CreateSessionReply in response. This method will only succeed if no existing +// session info is known about the session id. If an existing session is found, +// the reward address is returned in case the client lost our reply. +func (s *Server) handleCreateSession(peer Peer, id *wtdb.SessionID, + req *wtwire.CreateSession) error { + + // TODO(conner): validate accept against policy + + // Query the db for session info belonging to the client's session id. + existingInfo, err := s.cfg.DB.GetSessionInfo(id) + switch { + + // We already have a session corresponding to this session id, return an + // error signaling that it already exists in our database. We return the + // reward address to the client in case they were not able to process + // our reply earlier. + case err == nil: + log.Debugf("Already have session for %s", id) + return s.replyCreateSession( + peer, id, wtwire.CreateSessionCodeAlreadyExists, + existingInfo.RewardAddress, + ) + + // Some other database error occurred, return a temporary failure. + case err != wtdb.ErrSessionNotFound: + log.Errorf("unable to load session info for %s", id) + return s.replyCreateSession( + peer, id, wtwire.CodeTemporaryFailure, nil, + ) + } + + // Now that we've established that this session does not exist in the + // database, retrieve the sweep address that will be given to the + // client. This address is to be included by the client when signing + // sweep transactions destined for this tower, if its negotiated output + // is not dust. + rewardAddress, err := s.cfg.NewAddress() + if err != nil { + log.Errorf("unable to generate reward addr for %s", id) + return s.replyCreateSession( + peer, id, wtwire.CodeTemporaryFailure, nil, + ) + } + + // Construct the pkscript the client should pay to when signing justice + // transactions for this session. + rewardScript, err := txscript.PayToAddrScript(rewardAddress) + if err != nil { + log.Errorf("unable to generate reward script for %s", id) + return s.replyCreateSession( + peer, id, wtwire.CodeTemporaryFailure, nil, + ) + } + + // Ensure that the requested blob type is supported by our tower. + if !blob.IsSupportedType(req.BlobType) { + log.Debugf("Rejecting CreateSession from %s, unsupported blob "+ + "type %s", id, req.BlobType) + return s.replyCreateSession( + peer, id, wtwire.CreateSessionCodeRejectBlobType, nil, + ) + } + + // TODO(conner): create invoice for upfront payment + + // Assemble the session info using the agreed upon parameters, reward + // address, and session id. + info := wtdb.SessionInfo{ + ID: *id, + Policy: wtpolicy.Policy{ + BlobType: req.BlobType, + MaxUpdates: req.MaxUpdates, + RewardBase: req.RewardBase, + RewardRate: req.RewardRate, + SweepFeeRate: req.SweepFeeRate, + }, + RewardAddress: rewardScript, + } + + // Insert the session info into the watchtower's database. If + // successful, the session will now be ready for use. + err = s.cfg.DB.InsertSessionInfo(&info) + if err != nil { + log.Errorf("unable to create session for %s", id) + return s.replyCreateSession( + peer, id, wtwire.CodeTemporaryFailure, nil, + ) + } + + log.Infof("Accepted session for %s", id) + + return s.replyCreateSession( + peer, id, wtwire.CodeOK, rewardScript, + ) +} + +// replyCreateSession sends a response to a CreateSession from a client. If the +// status code in the reply is OK, the error from the write will be bubbled up. +// Otherwise, this method returns a connection error to ensure we don't continue +// communication with the client. +func (s *Server) replyCreateSession(peer Peer, id *wtdb.SessionID, + code wtwire.ErrorCode, data []byte) error { + + msg := &wtwire.CreateSessionReply{ + Code: code, + Data: data, + } + + err := s.sendMessage(peer, msg) + if err != nil { + log.Errorf("unable to send CreateSessionReply to %s", id) + } + + // Return the write error if the request succeeded. + if code == wtwire.CodeOK { + return err + } + + // Otherwise the request failed, return a connection failure to + // disconnect the client. + return &connFailure{ + ID: *id, + Code: uint16(code), + } +} diff --git a/watchtower/wtserver/server.go b/watchtower/wtserver/server.go index b9d22275..d1dc9b4c 100644 --- a/watchtower/wtserver/server.go +++ b/watchtower/wtserver/server.go @@ -11,12 +11,9 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/connmgr" - "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd/lnwire" - "github.com/lightningnetwork/lnd/watchtower/blob" "github.com/lightningnetwork/lnd/watchtower/wtdb" - "github.com/lightningnetwork/lnd/watchtower/wtpolicy" "github.com/lightningnetwork/lnd/watchtower/wtwire" ) @@ -24,6 +21,10 @@ var ( // ErrPeerAlreadyConnected signals that a peer with the same session id // is already active within the server. ErrPeerAlreadyConnected = errors.New("peer already connected") + + // ErrServerExiting signals that a request could not be processed + // because the server has been requested to shut down. + ErrServerExiting = errors.New("server shutting down") ) // Config abstracts the primary components and dependencies of the server. @@ -242,241 +243,33 @@ func (s *Server) handleClient(peer Peer) { return } - // stateUpdateOnlyMode will become true if the client's first message is - // a StateUpdate. If instead, it is a CreateSession, this method will exit - // immediately after replying. We track this to ensure that the client - // can't send a CreateSession after having already sent a StateUpdate. - var stateUpdateOnlyMode bool - for { - select { - case <-s.quit: - return - default: - } + nextMsg, err := s.readMessage(peer) + if err != nil { + log.Errorf("Unable to read watchtower msg from %s: %v", + id, err) + return + } - nextMsg, err := s.readMessage(peer) + switch msg := nextMsg.(type) { + case *wtwire.CreateSession: + // Attempt to open a new session for this client. + err = s.handleCreateSession(peer, &id, msg) if err != nil { - log.Errorf("Unable to read watchtower msg from %x: %v", - id[:], err) - return + log.Errorf("Unable to handle CreateSession "+ + "from %s: %v", id, err) } - // Process the request according to the message's type. - switch msg := nextMsg.(type) { - - // A CreateSession indicates a request to establish a new session - // with our watchtower. - case *wtwire.CreateSession: - // Ensure CreateSession can only be sent as the first - // message. - if stateUpdateOnlyMode { - log.Errorf("client %x sent CreateSession after "+ - "StateUpdate", id) - return - } - - // Attempt to open a new session for this client. - err := s.handleCreateSession(peer, &id, msg) - if err != nil { - log.Errorf("Unable to handle CreateSession "+ - "from %s: %v", id, err) - } - - // Exit after replying to CreateSession. - return - - // A StateUpdate indicates an existing client attempting to - // back-up a revoked commitment state. - case *wtwire.StateUpdate: - // Try to accept the state update from the client. - err := s.handleStateUpdate(peer, &id, msg) - if err != nil { - log.Errorf("unable to handle StateUpdate "+ - "from %s: %v", id, err) - return - } - - // If the client signals that this is last StateUpdate - // message, we can disconnect the client. - if msg.IsComplete == 1 { - return - } - - // The client has signaled that more StateUpdates are - // yet to come. Enter state-update-only mode to disallow - // future sends of CreateSession messages. - stateUpdateOnlyMode = true - - default: - log.Errorf("received unsupported message type: %T "+ - "from %s", nextMsg, id) - return + case *wtwire.StateUpdate: + err = s.handleStateUpdates(peer, &id, msg) + if err != nil { + log.Errorf("Unable to handle StateUpdate "+ + "from %s: %v", id, err) } - } -} - -// handleCreateSession processes a CreateSession message from the peer, and returns -// a CreateSessionReply in response. This method will only succeed if no existing -// session info is known about the session id. If an existing session is found, -// the reward address is returned in case the client lost our reply. -func (s *Server) handleCreateSession(peer Peer, id *wtdb.SessionID, - req *wtwire.CreateSession) error { - - // TODO(conner): validate accept against policy - - // Query the db for session info belonging to the client's session id. - existingInfo, err := s.cfg.DB.GetSessionInfo(id) - switch { - - // We already have a session corresponding to this session id, return an - // error signaling that it already exists in our database. We return the - // reward address to the client in case they were not able to process - // our reply earlier. - case err == nil: - log.Debugf("Already have session for %s", id) - return s.replyCreateSession( - peer, id, wtwire.CreateSessionCodeAlreadyExists, - existingInfo.RewardAddress, - ) - - // Some other database error occurred, return a temporary failure. - case err != wtdb.ErrSessionNotFound: - log.Errorf("unable to load session info for %s", id) - return s.replyCreateSession( - peer, id, wtwire.CodeTemporaryFailure, nil, - ) - } - - // Now that we've established that this session does not exist in the - // database, retrieve the sweep address that will be given to the - // client. This address is to be included by the client when signing - // sweep transactions destined for this tower, if its negotiated output - // is not dust. - rewardAddress, err := s.cfg.NewAddress() - if err != nil { - log.Errorf("unable to generate reward addr for %s", id) - return s.replyCreateSession( - peer, id, wtwire.CodeTemporaryFailure, nil, - ) - } - - // Construct the pkscript the client should pay to when signing justice - // transactions for this session. - rewardScript, err := txscript.PayToAddrScript(rewardAddress) - if err != nil { - log.Errorf("unable to generate reward script for %s", id) - return s.replyCreateSession( - peer, id, wtwire.CodeTemporaryFailure, nil, - ) - } - - // Ensure that the requested blob type is supported by our tower. - if !blob.IsSupportedType(req.BlobType) { - log.Debugf("Rejecting CreateSession from %s, unsupported blob "+ - "type %s", id, req.BlobType) - return s.replyCreateSession( - peer, id, wtwire.CreateSessionCodeRejectBlobType, nil, - ) - } - - // TODO(conner): create invoice for upfront payment - - // Assemble the session info using the agreed upon parameters, reward - // address, and session id. - info := wtdb.SessionInfo{ - ID: *id, - Policy: wtpolicy.Policy{ - BlobType: req.BlobType, - MaxUpdates: req.MaxUpdates, - RewardBase: req.RewardBase, - RewardRate: req.RewardRate, - SweepFeeRate: req.SweepFeeRate, - }, - RewardAddress: rewardScript, - } - - // Insert the session info into the watchtower's database. If - // successful, the session will now be ready for use. - err = s.cfg.DB.InsertSessionInfo(&info) - if err != nil { - log.Errorf("unable to create session for %s", id) - return s.replyCreateSession( - peer, id, wtwire.CodeTemporaryFailure, nil, - ) - } - - log.Infof("Accepted session for %s", id) - - return s.replyCreateSession( - peer, id, wtwire.CodeOK, rewardScript, - ) -} - -// handleStateUpdate processes a StateUpdate message request from a client. An -// attempt will be made to insert the update into the db, where it is validated -// against the client's session. The possible errors are then mapped back to -// StateUpdateCodes specified by the watchtower wire protocol, and sent back -// using a StateUpdateReply message. -func (s *Server) handleStateUpdate(peer Peer, id *wtdb.SessionID, - update *wtwire.StateUpdate) error { - - var ( - lastApplied uint16 - failCode wtwire.ErrorCode - err error - ) - - sessionUpdate := wtdb.SessionStateUpdate{ - ID: *id, - Hint: update.Hint, - SeqNum: update.SeqNum, - LastApplied: update.LastApplied, - EncryptedBlob: update.EncryptedBlob, - } - - lastApplied, err = s.cfg.DB.InsertStateUpdate(&sessionUpdate) - switch { - case err == nil: - log.Debugf("State update %d accepted for %s", - update.SeqNum, id) - - failCode = wtwire.CodeOK - - // Return a permanent failure if a client tries to send an update for - // which we have no session. - case err == wtdb.ErrSessionNotFound: - failCode = wtwire.CodePermanentFailure - - case err == wtdb.ErrSeqNumAlreadyApplied: - failCode = wtwire.CodePermanentFailure - - // TODO(conner): remove session state for protocol - // violation. Could also double as clean up method for - // session-related state. - - case err == wtdb.ErrLastAppliedReversion: - failCode = wtwire.StateUpdateCodeClientBehind - - case err == wtdb.ErrSessionConsumed: - failCode = wtwire.StateUpdateCodeMaxUpdatesExceeded - - case err == wtdb.ErrUpdateOutOfOrder: - failCode = wtwire.StateUpdateCodeSeqNumOutOfOrder default: - failCode = wtwire.CodeTemporaryFailure + log.Errorf("Received unsupported message type: %T "+ + "from %s", nextMsg, id) } - - if s.cfg.NoAckUpdates { - return &connFailure{ - ID: *id, - Code: uint16(failCode), - } - } - - return s.replyStateUpdate( - peer, id, failCode, lastApplied, - ) } // connFailure is a default error used when a request failed with a non-zero @@ -493,66 +286,6 @@ func (f *connFailure) Error() string { ) } -// replyCreateSession sends a response to a CreateSession from a client. If the -// status code in the reply is OK, the error from the write will be bubbled up. -// Otherwise, this method returns a connection error to ensure we don't continue -// communication with the client. -func (s *Server) replyCreateSession(peer Peer, id *wtdb.SessionID, - code wtwire.ErrorCode, data []byte) error { - - msg := &wtwire.CreateSessionReply{ - Code: code, - Data: data, - } - - err := s.sendMessage(peer, msg) - if err != nil { - log.Errorf("unable to send CreateSessionReply to %s", id) - } - - // Return the write error if the request succeeded. - if code == wtwire.CodeOK { - return err - } - - // Otherwise the request failed, return a connection failure to - // disconnect the client. - return &connFailure{ - ID: *id, - Code: uint16(code), - } -} - -// replyStateUpdate sends a response to a StateUpdate from a client. If the -// status code in the reply is OK, the error from the write will be bubbled up. -// Otherwise, this method returns a connection error to ensure we don't continue -// communication with the client. -func (s *Server) replyStateUpdate(peer Peer, id *wtdb.SessionID, - code wtwire.StateUpdateCode, lastApplied uint16) error { - - msg := &wtwire.StateUpdateReply{ - Code: code, - LastApplied: lastApplied, - } - - err := s.sendMessage(peer, msg) - if err != nil { - log.Errorf("unable to send StateUpdateReply to %s", id) - } - - // Return the write error if the request succeeded. - if code == wtwire.CodeOK { - return err - } - - // Otherwise the request failed, return a connection failure to - // disconnect the client. - return &connFailure{ - ID: *id, - Code: uint16(code), - } -} - // readMessage receives and parses the next message from the given Peer. An // error is returned if a message is not received before the server's read // timeout, the read off the wire failed, or the message could not be diff --git a/watchtower/wtserver/state_update.go b/watchtower/wtserver/state_update.go new file mode 100644 index 00000000..7b3e0941 --- /dev/null +++ b/watchtower/wtserver/state_update.go @@ -0,0 +1,157 @@ +package wtserver + +import ( + "fmt" + + "github.com/lightningnetwork/lnd/watchtower/wtdb" + "github.com/lightningnetwork/lnd/watchtower/wtwire" +) + +// handleStateUpdates processes a stream of StateUpdate requests from the +// client. The provided update should be the first such update read, subsequent +// updates will be consumed if the peer does not signal IsComplete on a +// particular update. +func (s *Server) handleStateUpdates(peer Peer, id *wtdb.SessionID, + update *wtwire.StateUpdate) error { + + // Set the current update to the first update read off the wire. + // Additional updates will be read if this value is set to nil after + // processing the first. + var curUpdate = update + for { + // If this is not the first update, read the next state update + // from the peer. + if curUpdate == nil { + nextMsg, err := s.readMessage(peer) + if err != nil { + return err + } + + var ok bool + curUpdate, ok = nextMsg.(*wtwire.StateUpdate) + if !ok { + return fmt.Errorf("client sent %T after "+ + "StateUpdate", nextMsg) + } + } + + // Try to accept the state update from the client. + err := s.handleStateUpdate(peer, id, curUpdate) + if err != nil { + return err + } + + // If the client signals that this is last StateUpdate + // message, we can disconnect the client. + if curUpdate.IsComplete == 1 { + return nil + } + + // Reset the current update to read subsequent updates in the + // stream. + curUpdate = nil + + select { + case <-s.quit: + return ErrServerExiting + default: + } + } +} + +// handleStateUpdate processes a StateUpdate message request from a client. An +// attempt will be made to insert the update into the db, where it is validated +// against the client's session. The possible errors are then mapped back to +// StateUpdateCodes specified by the watchtower wire protocol, and sent back +// using a StateUpdateReply message. +func (s *Server) handleStateUpdate(peer Peer, id *wtdb.SessionID, + update *wtwire.StateUpdate) error { + + var ( + lastApplied uint16 + failCode wtwire.ErrorCode + err error + ) + + sessionUpdate := wtdb.SessionStateUpdate{ + ID: *id, + Hint: update.Hint, + SeqNum: update.SeqNum, + LastApplied: update.LastApplied, + EncryptedBlob: update.EncryptedBlob, + } + + lastApplied, err = s.cfg.DB.InsertStateUpdate(&sessionUpdate) + switch { + case err == nil: + log.Debugf("State update %d accepted for %s", + update.SeqNum, id) + + failCode = wtwire.CodeOK + + // Return a permanent failure if a client tries to send an update for + // which we have no session. + case err == wtdb.ErrSessionNotFound: + failCode = wtwire.CodePermanentFailure + + case err == wtdb.ErrSeqNumAlreadyApplied: + failCode = wtwire.CodePermanentFailure + + // TODO(conner): remove session state for protocol + // violation. Could also double as clean up method for + // session-related state. + + case err == wtdb.ErrLastAppliedReversion: + failCode = wtwire.StateUpdateCodeClientBehind + + case err == wtdb.ErrSessionConsumed: + failCode = wtwire.StateUpdateCodeMaxUpdatesExceeded + + case err == wtdb.ErrUpdateOutOfOrder: + failCode = wtwire.StateUpdateCodeSeqNumOutOfOrder + + default: + failCode = wtwire.CodeTemporaryFailure + } + + if s.cfg.NoAckUpdates { + return &connFailure{ + ID: *id, + Code: uint16(failCode), + } + } + + return s.replyStateUpdate( + peer, id, failCode, lastApplied, + ) +} + +// replyStateUpdate sends a response to a StateUpdate from a client. If the +// status code in the reply is OK, the error from the write will be bubbled up. +// Otherwise, this method returns a connection error to ensure we don't continue +// communication with the client. +func (s *Server) replyStateUpdate(peer Peer, id *wtdb.SessionID, + code wtwire.StateUpdateCode, lastApplied uint16) error { + + msg := &wtwire.StateUpdateReply{ + Code: code, + LastApplied: lastApplied, + } + + err := s.sendMessage(peer, msg) + if err != nil { + log.Errorf("unable to send StateUpdateReply to %s", id) + } + + // Return the write error if the request succeeded. + if code == wtwire.CodeOK { + return err + } + + // Otherwise the request failed, return a connection failure to + // disconnect the client. + return &connFailure{ + ID: *id, + Code: uint16(code), + } +}