From 00db396b51916cd39786fa45067b7bd7218e4c59 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 8 Jan 2019 00:13:20 -0800 Subject: [PATCH 1/3] watchtower/wtwire/summary: adds message summaries --- watchtower/wtwire/summary.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 watchtower/wtwire/summary.go diff --git a/watchtower/wtwire/summary.go b/watchtower/wtwire/summary.go new file mode 100644 index 00000000..32f682b7 --- /dev/null +++ b/watchtower/wtwire/summary.go @@ -0,0 +1,35 @@ +package wtwire + +import "fmt" + +// MessageSummary creates a human-readable description of a given Message. If +// the type is unknown, an empty string is returned. +func MessageSummary(msg Message) string { + switch msg := msg.(type) { + case *Init: + return "" + + case *CreateSession: + return fmt.Sprintf("blob_type=%s, max_updates=%d "+ + "reward_rate=%d sweep_fee_rate=%d", msg.BlobType, + msg.MaxUpdates, msg.RewardRate, msg.SweepFeeRate) + + case *CreateSessionReply: + return fmt.Sprintf("code=%d", msg.Code) + + case *StateUpdate: + return fmt.Sprintf("seqnum=%d last_applied=%d is_complete=%d "+ + "hint=%x", msg.SeqNum, msg.LastApplied, msg.IsComplete, + msg.Hint) + + case *StateUpdateReply: + return fmt.Sprintf("code=%d last_applied=%d", msg.Code, + msg.LastApplied) + + case *Error: + return fmt.Sprintf("code=%d", msg.Code) + + default: + return "" + } +} From 4a2fb1b4d06df07658526fe3e8a012e92ca236c9 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 8 Jan 2019 00:14:07 -0800 Subject: [PATCH 2/3] watchtower/wtserver/server: log sent/recvd wire messages --- watchtower/wtserver/server.go | 48 ++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/watchtower/wtserver/server.go b/watchtower/wtserver/server.go index 1218b1f5..496c21f3 100644 --- a/watchtower/wtserver/server.go +++ b/watchtower/wtserver/server.go @@ -188,7 +188,7 @@ func (s *Server) handleClient(peer Peer) { peer.Close() return } - defer s.removePeer(&id) + defer s.removePeer(&id, peer.RemoteAddr()) msg, err := s.readMessage(peer) remoteInit, ok := msg.(*wtwire.Init) @@ -246,12 +246,6 @@ func (s *Server) handleClient(peer Peer) { return } - log.Infof("Received CreateSession from %s, "+ - "version=%d nupdates=%d rewardrate=%d "+ - "sweepfeerate=%d", id, msg.BlobType, - msg.MaxUpdates, msg.RewardRate, - msg.SweepFeeRate) - // Attempt to open a new session for this client. err := s.handleCreateSession(peer, &id, msg) if err != nil { @@ -265,11 +259,6 @@ func (s *Server) handleClient(peer Peer) { // A StateUpdate indicates an existing client attempting to // back-up a revoked commitment state. case *wtwire.StateUpdate: - log.Infof("Received SessionUpdate from %s, seqnum=%d "+ - "lastapplied=%d complete=%v hint=%x", id, - msg.SeqNum, msg.LastApplied, msg.IsComplete, - msg.Hint[:]) - // Try to accept the state update from the client. err := s.handleStateUpdate(peer, &id, msg) if err != nil { @@ -437,7 +426,7 @@ func (s *Server) handleStateUpdate(peer Peer, id *wtdb.SessionID, lastApplied, err = s.cfg.DB.InsertStateUpdate(&sessionUpdate) switch { case err == nil: - log.Infof("State update %d accepted for %s", + log.Debugf("State update %d accepted for %s", update.SeqNum, id) failCode = wtwire.CodeOK @@ -568,13 +557,15 @@ func (s *Server) readMessage(peer Peer) (wtwire.Message, error) { } msgReader := bytes.NewReader(rawMsg) - nextMsg, err := wtwire.ReadMessage(msgReader, 0) + msg, err := wtwire.ReadMessage(msgReader, 0) if err != nil { err = fmt.Errorf("unable to parse message: %v", err) return nil, err } - return nextMsg, nil + logMessage(peer, msg, true) + + return msg, nil } // sendMessage sends a watchtower wire message to the target peer. @@ -594,6 +585,8 @@ func (s *Server) sendMessage(peer Peer, msg wtwire.Message) error { return err } + logMessage(peer, msg, false) + _, err = peer.Write(b.Bytes()) return err } @@ -619,8 +612,8 @@ func (s *Server) addPeer(id *wtdb.SessionID, peer Peer) error { // removePeer deletes a client from the server's client map. If a peer is found, // this method will close the peer's connection. -func (s *Server) removePeer(id *wtdb.SessionID) { - log.Infof("Releasing incoming peer %s", id) +func (s *Server) removePeer(id *wtdb.SessionID, addr net.Addr) { + log.Infof("Releasing incoming peer %s@%s", id, addr) s.clientMtx.Lock() peer, ok := s.clients[*id] @@ -632,6 +625,27 @@ func (s *Server) removePeer(id *wtdb.SessionID) { } } +// logMessage writes information about a message exchanged with a remote peer, +// using directional prepositions to signal whether the message was sent or +// received. +func logMessage(peer Peer, msg wtwire.Message, read bool) { + var action = "Received" + var preposition = "from" + if !read { + action = "Sending" + preposition = "to" + } + + summary := wtwire.MessageSummary(msg) + if len(summary) > 0 { + summary = "(" + summary + ")" + } + + log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary, + preposition, peer.RemotePub().SerializeCompressed(), + peer.RemoteAddr()) +} + // noDial is a dummy dial method passed to the server's connmgr. func noDial(_ net.Addr) (net.Conn, error) { return nil, fmt.Errorf("watchtower cannot make outgoing conns") From 45cfa65b479f5479e121e9e1ef1493e12e0122f2 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 15 Jan 2019 19:46:04 -0800 Subject: [PATCH 3/3] watchtower/wtserver/server: handle missed err case from read msg --- watchtower/wtserver/server.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/watchtower/wtserver/server.go b/watchtower/wtserver/server.go index 496c21f3..3f09e07a 100644 --- a/watchtower/wtserver/server.go +++ b/watchtower/wtserver/server.go @@ -191,6 +191,12 @@ func (s *Server) handleClient(peer Peer) { defer s.removePeer(&id, peer.RemoteAddr()) msg, err := s.readMessage(peer) + if err != nil { + log.Errorf("Unable to read message from client %s@%s: %v", + id, peer.RemoteAddr(), err) + return + } + remoteInit, ok := msg.(*wtwire.Init) if !ok { log.Errorf("Client %s@%s did not send Init msg as first "+