watchtower/wtserver/server: log sent/recvd wire messages
This commit is contained in:
parent
00db396b51
commit
4a2fb1b4d0
@ -188,7 +188,7 @@ func (s *Server) handleClient(peer Peer) {
|
|||||||
peer.Close()
|
peer.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer s.removePeer(&id)
|
defer s.removePeer(&id, peer.RemoteAddr())
|
||||||
|
|
||||||
msg, err := s.readMessage(peer)
|
msg, err := s.readMessage(peer)
|
||||||
remoteInit, ok := msg.(*wtwire.Init)
|
remoteInit, ok := msg.(*wtwire.Init)
|
||||||
@ -246,12 +246,6 @@ func (s *Server) handleClient(peer Peer) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("Received CreateSession from %s, "+
|
|
||||||
"version=%d nupdates=%d rewardrate=%d "+
|
|
||||||
"sweepfeerate=%d", id, msg.BlobType,
|
|
||||||
msg.MaxUpdates, msg.RewardRate,
|
|
||||||
msg.SweepFeeRate)
|
|
||||||
|
|
||||||
// Attempt to open a new session for this client.
|
// Attempt to open a new session for this client.
|
||||||
err := s.handleCreateSession(peer, &id, msg)
|
err := s.handleCreateSession(peer, &id, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -265,11 +259,6 @@ func (s *Server) handleClient(peer Peer) {
|
|||||||
// A StateUpdate indicates an existing client attempting to
|
// A StateUpdate indicates an existing client attempting to
|
||||||
// back-up a revoked commitment state.
|
// back-up a revoked commitment state.
|
||||||
case *wtwire.StateUpdate:
|
case *wtwire.StateUpdate:
|
||||||
log.Infof("Received SessionUpdate from %s, seqnum=%d "+
|
|
||||||
"lastapplied=%d complete=%v hint=%x", id,
|
|
||||||
msg.SeqNum, msg.LastApplied, msg.IsComplete,
|
|
||||||
msg.Hint[:])
|
|
||||||
|
|
||||||
// Try to accept the state update from the client.
|
// Try to accept the state update from the client.
|
||||||
err := s.handleStateUpdate(peer, &id, msg)
|
err := s.handleStateUpdate(peer, &id, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -437,7 +426,7 @@ func (s *Server) handleStateUpdate(peer Peer, id *wtdb.SessionID,
|
|||||||
lastApplied, err = s.cfg.DB.InsertStateUpdate(&sessionUpdate)
|
lastApplied, err = s.cfg.DB.InsertStateUpdate(&sessionUpdate)
|
||||||
switch {
|
switch {
|
||||||
case err == nil:
|
case err == nil:
|
||||||
log.Infof("State update %d accepted for %s",
|
log.Debugf("State update %d accepted for %s",
|
||||||
update.SeqNum, id)
|
update.SeqNum, id)
|
||||||
|
|
||||||
failCode = wtwire.CodeOK
|
failCode = wtwire.CodeOK
|
||||||
@ -568,13 +557,15 @@ func (s *Server) readMessage(peer Peer) (wtwire.Message, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
msgReader := bytes.NewReader(rawMsg)
|
msgReader := bytes.NewReader(rawMsg)
|
||||||
nextMsg, err := wtwire.ReadMessage(msgReader, 0)
|
msg, err := wtwire.ReadMessage(msgReader, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("unable to parse message: %v", err)
|
err = fmt.Errorf("unable to parse message: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nextMsg, nil
|
logMessage(peer, msg, true)
|
||||||
|
|
||||||
|
return msg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendMessage sends a watchtower wire message to the target peer.
|
// sendMessage sends a watchtower wire message to the target peer.
|
||||||
@ -594,6 +585,8 @@ func (s *Server) sendMessage(peer Peer, msg wtwire.Message) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logMessage(peer, msg, false)
|
||||||
|
|
||||||
_, err = peer.Write(b.Bytes())
|
_, err = peer.Write(b.Bytes())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -619,8 +612,8 @@ func (s *Server) addPeer(id *wtdb.SessionID, peer Peer) error {
|
|||||||
|
|
||||||
// removePeer deletes a client from the server's client map. If a peer is found,
|
// removePeer deletes a client from the server's client map. If a peer is found,
|
||||||
// this method will close the peer's connection.
|
// this method will close the peer's connection.
|
||||||
func (s *Server) removePeer(id *wtdb.SessionID) {
|
func (s *Server) removePeer(id *wtdb.SessionID, addr net.Addr) {
|
||||||
log.Infof("Releasing incoming peer %s", id)
|
log.Infof("Releasing incoming peer %s@%s", id, addr)
|
||||||
|
|
||||||
s.clientMtx.Lock()
|
s.clientMtx.Lock()
|
||||||
peer, ok := s.clients[*id]
|
peer, ok := s.clients[*id]
|
||||||
@ -632,6 +625,27 @@ func (s *Server) removePeer(id *wtdb.SessionID) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// logMessage writes information about a message exchanged with a remote peer,
|
||||||
|
// using directional prepositions to signal whether the message was sent or
|
||||||
|
// received.
|
||||||
|
func logMessage(peer Peer, msg wtwire.Message, read bool) {
|
||||||
|
var action = "Received"
|
||||||
|
var preposition = "from"
|
||||||
|
if !read {
|
||||||
|
action = "Sending"
|
||||||
|
preposition = "to"
|
||||||
|
}
|
||||||
|
|
||||||
|
summary := wtwire.MessageSummary(msg)
|
||||||
|
if len(summary) > 0 {
|
||||||
|
summary = "(" + summary + ")"
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary,
|
||||||
|
preposition, peer.RemotePub().SerializeCompressed(),
|
||||||
|
peer.RemoteAddr())
|
||||||
|
}
|
||||||
|
|
||||||
// noDial is a dummy dial method passed to the server's connmgr.
|
// noDial is a dummy dial method passed to the server's connmgr.
|
||||||
func noDial(_ net.Addr) (net.Conn, error) {
|
func noDial(_ net.Addr) (net.Conn, error) {
|
||||||
return nil, fmt.Errorf("watchtower cannot make outgoing conns")
|
return nil, fmt.Errorf("watchtower cannot make outgoing conns")
|
||||||
|
Loading…
Reference in New Issue
Block a user