From 24a69c116450f0b6780c6045de62a7239a029bd3 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 13 Mar 2017 20:39:16 -0700 Subject: [PATCH] rpc: implement new streaming SubscribeChannelGraph RPC call This commit implements the new server-side streaming RPC call within the current default implementation of the RPC server. With this, the new functionality can now be used within the integration tests to achieve a greater degree of synchronization in the tests. As a result, we should be able to eliminate many of the sleeps lingering within the tests. --- rpcserver.go | 122 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/rpcserver.go b/rpcserver.go index bbbfe381..9839b3c1 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -262,6 +262,8 @@ func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest, err error ) + // TODO(roasbeef): also return channel ID? + // If the node key is set, the we'll parse the raw bytes into a pubkey // object so we can easily manipulate it. If this isn't set, then we // expected the TargetPeerId to be set accordingly. @@ -1671,6 +1673,126 @@ func (r *rpcServer) GetNetworkInfo(context.Context, *lnrpc.NetworkInfoRequest) ( }, nil } +// SubscribeChannelGraph launches a streaming RPC that allows the caller to +// receive notifications upon any changes the channel graph topology from the +// review of the responding node. Events notified include: new nodes coming +// online, nodes updating their authenticated attributes, new channels being +// advertised, updates in the routing policy for a directional channel edge, +// and finally when prior channels are closed on-chain. +func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription, + updateStream lnrpc.Lightning_SubscribeChannelGraphServer) error { + + // First, we start by subscribing to a new intent to receive + // notifications from the channel router. + client, err := r.server.chanRouter.SubscribeTopology() + if err != nil { + return nil + } + + // Ensure that the resources for the topology update client is cleaned + // up once either the server, or client exists. + defer client.Cancel() + + for { + select { + + // A new update has been sent by the channel router, we'll + // marshall it into the form expected by the gRPC client, then + // send it off. + case topChange, ok := <-client.TopologyChanges: + // If the second value from the channel read is nil, + // then this means that the channel router is exiting + // or the notification client was cancelled. So we'll + // exit early. + if !ok { + return errors.New("sever shutting down") + } + + // Convert the struct from the channel router into the + // form expected by the gRPC service then send it off + // to the client. + graphUpdate := marshallTopologyChange(topChange) + if err := updateStream.Send(graphUpdate); err != nil { + return err + } + + // The server is quitting, so we'll exit immediately. Returning + // nil will close the clients read end of the stream. + case <-r.quit: + return nil + } + } + + return nil +} + +// marshallTopologyChange performs a mapping from the topology change sturct +// returned by the router to the form of notifications expected by the current +// gRPC service. +func marshallTopologyChange(topChange *routing.TopologyChange) *lnrpc.GraphTopologyUpdate { + + // encodeKey is a simple helper function that converts a live public + // key into a hex-encoded version of the compressed serialization for + // the public key. + encodeKey := func(k *btcec.PublicKey) string { + return hex.EncodeToString(k.SerializeCompressed()) + } + + nodeUpdates := make([]*lnrpc.NodeUpdate, len(topChange.NodeUpdates)) + for i, nodeUpdate := range topChange.NodeUpdates { + addrs := make([]string, len(nodeUpdate.Addresses)) + for i, addr := range nodeUpdate.Addresses { + addrs[i] = addr.String() + } + + nodeUpdates[i] = &lnrpc.NodeUpdate{ + Addresses: addrs, + IdentityKey: encodeKey(nodeUpdate.IdentityKey), + GlobalFeatures: nodeUpdate.GlobalFeatures, + Alias: nodeUpdate.Alias, + } + } + + channelUpdates := make([]*lnrpc.ChannelEdgeUpdate, len(topChange.ChannelEdgeUpdates)) + for i, channelUpdate := range topChange.ChannelEdgeUpdates { + channelUpdates[i] = &lnrpc.ChannelEdgeUpdate{ + ChanId: channelUpdate.ChanID, + ChanPoint: &lnrpc.ChannelPoint{ + FundingTxid: channelUpdate.ChanPoint.Hash[:], + OutputIndex: channelUpdate.ChanPoint.Index, + }, + Capacity: int64(channelUpdate.Capacity), + RoutingPolicy: &lnrpc.RoutingPolicy{ + TimeLockDelta: uint32(channelUpdate.TimeLockDelta), + MinHtlc: int64(channelUpdate.MinHTLC), + FeeBaseMsat: int64(channelUpdate.BaseFee), + FeeRateMilliMsat: int64(channelUpdate.FeeRate), + }, + AdvertisingNode: encodeKey(channelUpdate.AdvertisingNode), + ConnectingNode: encodeKey(channelUpdate.ConnectingNode), + } + } + + closedChans := make([]*lnrpc.ClosedChannelUpdate, len(topChange.ClosedChannels)) + for i, closedChan := range topChange.ClosedChannels { + closedChans[i] = &lnrpc.ClosedChannelUpdate{ + ChanId: closedChan.ChanID, + Capacity: int64(closedChan.Capacity), + ClosedHeight: closedChan.ClosedHeight, + ChanPoint: &lnrpc.ChannelPoint{ + FundingTxid: closedChan.ChanPoint.Hash[:], + OutputIndex: closedChan.ChanPoint.Index, + }, + } + } + + return &lnrpc.GraphTopologyUpdate{ + NodeUpdates: nodeUpdates, + ChannelUpdates: channelUpdates, + ClosedChans: closedChans, + } +} + // ListPayments returns a list of all outgoing payments. func (r *rpcServer) ListPayments(context.Context, *lnrpc.ListPaymentsRequest) (*lnrpc.ListPaymentsResponse, error) {