From ff0339a186debc95e53bd78ba343fa027fed95cf Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 22 Jan 2019 18:28:35 -0800 Subject: [PATCH] rpcserver: implement SubscribeChannels RPC call. --- rpcserver.go | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/rpcserver.go b/rpcserver.go index 933be798..b4326ca6 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -30,6 +30,7 @@ import ( "github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/invoices" @@ -245,6 +246,10 @@ var ( Entity: "offchain", Action: "read", }}, + "/lnrpc.Lightning/SubscribeChannelEvents": {{ + Entity: "offchain", + Action: "read", + }}, "/lnrpc.Lightning/ClosedChannels": {{ Entity: "offchain", Action: "read", @@ -2474,6 +2479,84 @@ func createRPCClosedChannel( } } +// SubscribeChannelEvents returns a uni-directional stream (server -> client) +// for notifying the client of newly active, inactive or closed channels. +func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription, + updateStream lnrpc.Lightning_SubscribeChannelEventsServer) error { + + channelEventSub, err := r.server.channelNotifier.SubscribeChannelEvents() + if err != nil { + return err + } + + // Ensure that the resources for the client is cleaned up once either + // the server, or client exits. + defer channelEventSub.Cancel() + + graph := r.server.chanDB.ChannelGraph() + + for { + select { + // A new update has been sent by the channel router, we'll + // marshal it into the form expected by the gRPC client, then + // send it off to the client(s). + case e := <-channelEventSub.Updates(): + var update *lnrpc.ChannelEventUpdate + switch event := e.(type) { + case channelnotifier.OpenChannelEvent: + channel := createRPCOpenChannel(r, graph, + event.Channel, true) + update = &lnrpc.ChannelEventUpdate{ + Type: lnrpc.ChannelEventUpdate_OPEN_CHANNEL, + Channel: &lnrpc.ChannelEventUpdate_OpenChannel{ + OpenChannel: channel, + }, + } + case channelnotifier.ClosedChannelEvent: + closedChannel := createRPCClosedChannel(event.CloseSummary) + update = &lnrpc.ChannelEventUpdate{ + Type: lnrpc.ChannelEventUpdate_CLOSED_CHANNEL, + Channel: &lnrpc.ChannelEventUpdate_ClosedChannel{ + ClosedChannel: closedChannel, + }, + } + case channelnotifier.ActiveChannelEvent: + update = &lnrpc.ChannelEventUpdate{ + Type: lnrpc.ChannelEventUpdate_ACTIVE_CHANNEL, + Channel: &lnrpc.ChannelEventUpdate_ActiveChannel{ + ActiveChannel: &lnrpc.ChannelPoint{ + FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{ + FundingTxidBytes: event.ChannelPoint.Hash[:], + }, + OutputIndex: event.ChannelPoint.Index, + }, + }, + } + case channelnotifier.InactiveChannelEvent: + update = &lnrpc.ChannelEventUpdate{ + Type: lnrpc.ChannelEventUpdate_INACTIVE_CHANNEL, + Channel: &lnrpc.ChannelEventUpdate_InactiveChannel{ + InactiveChannel: &lnrpc.ChannelPoint{ + FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{ + FundingTxidBytes: event.ChannelPoint.Hash[:], + }, + OutputIndex: event.ChannelPoint.Index, + }, + }, + } + default: + return fmt.Errorf("unexpected channel event update: %v", event) + } + + if err := updateStream.Send(update); err != nil { + return err + } + case <-r.quit: + return nil + } + } +} + // savePayment saves a successfully completed payment to the database for // historical record keeping. func (r *rpcServer) savePayment(route *routing.Route,