server: optimize message broadcast+targeted send

This commit slightly optimizes the process of broadcasting a message to
a list of peers, and also sending a set of messages to a target peer.

When broadcasting a message to a set of target peers, we now launch a
goroutine for each send as to not block the ChannelRouter on an
individual send. When sending a set of messages to a target peer, we
now give up the mutex as soon as we’ve access the map, rather than
holding onto it until the sending is complete.
This commit is contained in:
Olaoluwa Osuntokun 2017-01-29 15:02:57 -08:00
parent d94777acf6
commit 1ee4c661bc
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
3 changed files with 9 additions and 4 deletions

@ -531,6 +531,7 @@ func testSingleHopInvoice(net *networkHarness, t *harnessTest) {
// With the invoice for Bob added, send a payment towards Alice paying // With the invoice for Bob added, send a payment towards Alice paying
// to the above generated invoice. // to the above generated invoice.
time.Sleep(time.Millisecond * 500)
sendStream, err := net.Alice.SendPayment(ctxb) sendStream, err := net.Alice.SendPayment(ctxb)
if err != nil { if err != nil {
t.Fatalf("unable to create alice payment stream: %v", err) t.Fatalf("unable to create alice payment stream: %v", err)

@ -1735,6 +1735,8 @@ func (r *rpcServer) GetNetworkInfo(context.Context, *lnrpc.NetworkInfoRequest) (
return nil, err return nil, err
} }
// TODO(roasbeef): graph diameter
// TODO(roasbeef): also add oldest channel? // TODO(roasbeef): also add oldest channel?
// * also add median channel size // * also add median channel size
return &lnrpc.NetworkInfo{ return &lnrpc.NetworkInfo{

@ -594,9 +594,11 @@ out:
continue continue
} }
for _, msg := range bMsg.msgs { go func() {
peer.queueMsg(msg, nil) for _, msg := range bMsg.msgs {
} peer.queueMsg(msg, nil)
}
}()
} }
s.peersMtx.RUnlock() s.peersMtx.RUnlock()
@ -624,11 +626,11 @@ out:
sMsg.errChan <- errors.New("peer not found") sMsg.errChan <- errors.New("peer not found")
return return
} }
s.peersMtx.RUnlock()
for _, msg := range sMsg.msgs { for _, msg := range sMsg.msgs {
targetPeer.queueMsg(msg, nil) targetPeer.queueMsg(msg, nil)
} }
s.peersMtx.RUnlock()
sMsg.errChan <- nil sMsg.errChan <- nil
}() }()