Merge pull request #2320 from cfromknecht/wtpolicy

[watchtower] add wtpolicy.Policy and wtserver message logging
This commit is contained in:
Olaoluwa Osuntokun 2019-01-16 14:05:59 -08:00 committed by GitHub
commit 099d260318
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 72 additions and 17 deletions

@ -188,9 +188,15 @@ func (s *Server) handleClient(peer Peer) {
peer.Close()
return
}
defer s.removePeer(&id)
defer s.removePeer(&id, peer.RemoteAddr())
msg, err := s.readMessage(peer)
if err != nil {
log.Errorf("Unable to read message from client %s@%s: %v",
id, peer.RemoteAddr(), err)
return
}
remoteInit, ok := msg.(*wtwire.Init)
if !ok {
log.Errorf("Client %s@%s did not send Init msg as first "+
@ -246,12 +252,6 @@ func (s *Server) handleClient(peer Peer) {
return
}
log.Infof("Received CreateSession from %s, "+
"version=%d nupdates=%d rewardrate=%d "+
"sweepfeerate=%d", id, msg.BlobType,
msg.MaxUpdates, msg.RewardRate,
msg.SweepFeeRate)
// Attempt to open a new session for this client.
err := s.handleCreateSession(peer, &id, msg)
if err != nil {
@ -265,11 +265,6 @@ func (s *Server) handleClient(peer Peer) {
// A StateUpdate indicates an existing client attempting to
// back-up a revoked commitment state.
case *wtwire.StateUpdate:
log.Infof("Received SessionUpdate from %s, seqnum=%d "+
"lastapplied=%d complete=%v hint=%x", id,
msg.SeqNum, msg.LastApplied, msg.IsComplete,
msg.Hint[:])
// Try to accept the state update from the client.
err := s.handleStateUpdate(peer, &id, msg)
if err != nil {
@ -437,7 +432,7 @@ func (s *Server) handleStateUpdate(peer Peer, id *wtdb.SessionID,
lastApplied, err = s.cfg.DB.InsertStateUpdate(&sessionUpdate)
switch {
case err == nil:
log.Infof("State update %d accepted for %s",
log.Debugf("State update %d accepted for %s",
update.SeqNum, id)
failCode = wtwire.CodeOK
@ -568,13 +563,15 @@ func (s *Server) readMessage(peer Peer) (wtwire.Message, error) {
}
msgReader := bytes.NewReader(rawMsg)
nextMsg, err := wtwire.ReadMessage(msgReader, 0)
msg, err := wtwire.ReadMessage(msgReader, 0)
if err != nil {
err = fmt.Errorf("unable to parse message: %v", err)
return nil, err
}
return nextMsg, nil
logMessage(peer, msg, true)
return msg, nil
}
// sendMessage sends a watchtower wire message to the target peer.
@ -594,6 +591,8 @@ func (s *Server) sendMessage(peer Peer, msg wtwire.Message) error {
return err
}
logMessage(peer, msg, false)
_, err = peer.Write(b.Bytes())
return err
}
@ -619,8 +618,8 @@ func (s *Server) addPeer(id *wtdb.SessionID, peer Peer) error {
// removePeer deletes a client from the server's client map. If a peer is found,
// this method will close the peer's connection.
func (s *Server) removePeer(id *wtdb.SessionID) {
log.Infof("Releasing incoming peer %s", id)
func (s *Server) removePeer(id *wtdb.SessionID, addr net.Addr) {
log.Infof("Releasing incoming peer %s@%s", id, addr)
s.clientMtx.Lock()
peer, ok := s.clients[*id]
@ -632,6 +631,27 @@ func (s *Server) removePeer(id *wtdb.SessionID) {
}
}
// logMessage writes information about a message exchanged with a remote peer,
// using directional prepositions to signal whether the message was sent or
// received.
func logMessage(peer Peer, msg wtwire.Message, read bool) {
var action = "Received"
var preposition = "from"
if !read {
action = "Sending"
preposition = "to"
}
summary := wtwire.MessageSummary(msg)
if len(summary) > 0 {
summary = "(" + summary + ")"
}
log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary,
preposition, peer.RemotePub().SerializeCompressed(),
peer.RemoteAddr())
}
// noDial is a dummy dial method passed to the server's connmgr.
func noDial(_ net.Addr) (net.Conn, error) {
return nil, fmt.Errorf("watchtower cannot make outgoing conns")

@ -0,0 +1,35 @@
package wtwire
import "fmt"
// MessageSummary creates a human-readable description of a given Message. If
// the type is unknown, an empty string is returned.
func MessageSummary(msg Message) string {
switch msg := msg.(type) {
case *Init:
return ""
case *CreateSession:
return fmt.Sprintf("blob_type=%s, max_updates=%d "+
"reward_rate=%d sweep_fee_rate=%d", msg.BlobType,
msg.MaxUpdates, msg.RewardRate, msg.SweepFeeRate)
case *CreateSessionReply:
return fmt.Sprintf("code=%d", msg.Code)
case *StateUpdate:
return fmt.Sprintf("seqnum=%d last_applied=%d is_complete=%d "+
"hint=%x", msg.SeqNum, msg.LastApplied, msg.IsComplete,
msg.Hint)
case *StateUpdateReply:
return fmt.Sprintf("code=%d last_applied=%d", msg.Code,
msg.LastApplied)
case *Error:
return fmt.Sprintf("code=%d", msg.Code)
default:
return ""
}
}