diff --git a/chanacceptor/acceptor_test.go b/chanacceptor/acceptor_test.go index 36c7d6b3..a85084d2 100644 --- a/chanacceptor/acceptor_test.go +++ b/chanacceptor/acceptor_test.go @@ -1,156 +1,207 @@ package chanacceptor import ( - "bytes" - "sync/atomic" + "errors" + "math/big" "testing" "time" - "github.com/lightningnetwork/lnd/lnrpc" - "github.com/btcsuite/btcd/btcec" + "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/assert" ) -func randKey(t *testing.T) *btcec.PublicKey { - t.Helper() +const testTimeout = time.Second - priv, err := btcec.NewPrivateKey(btcec.S256()) - if err != nil { - t.Fatalf("unable to generate new public key") - } +type channelAcceptorCtx struct { + t *testing.T - return priv.PubKey() + // extRequests is the channel that we send our channel accept requests + // into, this channel mocks sending of a request to the rpc acceptor. + // This channel should be buffered with the number of requests we want + // to send so that it does not block (like a rpc stream). + extRequests chan []byte + + // responses is a map of pending channel IDs to the response which we + // wish to mock the remote channel acceptor sending. + responses map[[32]byte]*lnrpc.ChannelAcceptResponse + + // acceptor is the channel acceptor we create for the test. + acceptor *RPCAcceptor + + // errChan is a channel that the error the channel acceptor exits with + // is sent into. + errChan chan error + + // quit is a channel that can be used to shutdown the channel acceptor + // and return errShuttingDown. + quit chan struct{} } -// requestInfo encapsulates the information sent from the RPCAcceptor to the -// receiver on the other end of the stream. -type requestInfo struct { - chanReq *ChannelAcceptRequest - responseChan chan lnrpc.ChannelAcceptResponse -} +func newChanAcceptorCtx(t *testing.T, acceptCallCount int, + responses map[[32]byte]*lnrpc.ChannelAcceptResponse) *channelAcceptorCtx { -var defaultAcceptTimeout = 5 * time.Second - -func acceptAndIncrementCtr(rpc ChannelAcceptor, req *ChannelAcceptRequest, - ctr *uint32, success chan struct{}) { - - result := rpc.Accept(req) - if !result { - return + testCtx := &channelAcceptorCtx{ + t: t, + extRequests: make(chan []byte, acceptCallCount), + responses: responses, + errChan: make(chan error), + quit: make(chan struct{}), } - val := atomic.AddUint32(ctr, 1) - if val == 3 { - success <- struct{}{} - } -} - -// TestMultipleRPCClients tests that the RPCAcceptor is able to handle multiple -// callers to its Accept method and respond to them correctly. -func TestRPCMultipleAcceptClients(t *testing.T) { - - var ( - node = randKey(t) - - firstOpenReq = &ChannelAcceptRequest{ - Node: node, - OpenChanMsg: &lnwire.OpenChannel{ - PendingChannelID: [32]byte{0}, - }, - } - - secondOpenReq = &ChannelAcceptRequest{ - Node: node, - OpenChanMsg: &lnwire.OpenChannel{ - PendingChannelID: [32]byte{1}, - }, - } - - thirdOpenReq = &ChannelAcceptRequest{ - Node: node, - OpenChanMsg: &lnwire.OpenChannel{ - PendingChannelID: [32]byte{2}, - }, - } - - counter uint32 + testCtx.acceptor = NewRPCAcceptor( + testCtx.receiveResponse, testCtx.sendRequest, testTimeout*5, + testCtx.quit, ) - quit := make(chan struct{}) - defer close(quit) + return testCtx +} - // Create channels to handle requests and successes. - requests := make(chan *requestInfo) - successChan := make(chan struct{}) - errChan := make(chan struct{}, 4) +// sendRequest mocks sending a request to the channel acceptor. +func (c *channelAcceptorCtx) sendRequest(request *lnrpc.ChannelAcceptRequest) error { + select { + case c.extRequests <- request.PendingChanId: - // demultiplexReq is a closure used to abstract the RPCAcceptor's request - // and response logic. - demultiplexReq := func(req *ChannelAcceptRequest) bool { - respChan := make(chan lnrpc.ChannelAcceptResponse, 1) - - newRequest := &requestInfo{ - chanReq: req, - responseChan: respChan, - } - - // Send the newRequest to the requests channel. - select { - case requests <- newRequest: - case <-quit: - return false - } - - // Receive the response and verify that the PendingChanId matches - // the ID found in the ChannelAcceptRequest. If no response has been - // received in defaultAcceptTimeout, then return false. - select { - case resp := <-respChan: - pendingID := req.OpenChanMsg.PendingChannelID - if !bytes.Equal(pendingID[:], resp.PendingChanId) { - errChan <- struct{}{} - return false - } - - return resp.Accept - case <-time.After(defaultAcceptTimeout): - errChan <- struct{}{} - return false - case <-quit: - return false - } + case <-time.After(testTimeout): + c.t.Fatalf("timeout sending request: %v", request.PendingChanId) } - rpcAcceptor := NewRPCAcceptor(demultiplexReq) + return nil +} - // Now we call the Accept method for each request. +// receiveResponse mocks sending of a response from the channel acceptor. +func (c *channelAcceptorCtx) receiveResponse() (*lnrpc.ChannelAcceptResponse, + error) { + + select { + case id := <-c.extRequests: + scratch := [32]byte{} + copy(scratch[:], id) + + resp, ok := c.responses[scratch] + assert.True(c.t, ok) + + return resp, nil + + case <-time.After(testTimeout): + c.t.Fatalf("timeout receiving request") + return nil, errors.New("receiveResponse timeout") + + // Exit if our test acceptor closes the done channel, which indicates + // that the acceptor is shutting down. + case <-c.acceptor.done: + return nil, errors.New("acceptor shutting down") + } +} + +// start runs our channel acceptor in a goroutine which sends its exit error +// into our test error channel. +func (c *channelAcceptorCtx) start() { go func() { - acceptAndIncrementCtr(rpcAcceptor, firstOpenReq, &counter, successChan) + c.errChan <- c.acceptor.Run() }() +} - go func() { - acceptAndIncrementCtr(rpcAcceptor, secondOpenReq, &counter, successChan) - }() +// stop shuts down the test's channel acceptor and asserts that it exits with +// our expected error. +func (c *channelAcceptorCtx) stop() { + close(c.quit) - go func() { - acceptAndIncrementCtr(rpcAcceptor, thirdOpenReq, &counter, successChan) - }() + select { + case actual := <-c.errChan: + assert.Equal(c.t, errShuttingDown, actual) - for { + case <-time.After(testTimeout): + c.t.Fatal("timeout waiting for acceptor to exit") + } +} + +// queryAndAssert takes a map of open channel requests which we want to call +// Accept for to the outcome we expect from the acceptor, dispatches each +// request in a goroutine and then asserts that we get the outcome we expect. +func (c *channelAcceptorCtx) queryAndAssert(queries map[*lnwire.OpenChannel]bool) { + var ( + node = &btcec.PublicKey{ + X: big.NewInt(1), + Y: big.NewInt(1), + } + + responses = make(chan struct{}) + ) + + for request, expected := range queries { + request := request + expected := expected + + go func() { + resp := c.acceptor.Accept(&ChannelAcceptRequest{ + Node: node, + OpenChanMsg: request, + }) + assert.Equal(c.t, expected, resp) + responses <- struct{}{} + }() + } + + // Wait for each of our requests to return a response before we exit. + for i := 0; i < len(queries); i++ { select { - case newRequest := <-requests: - newResponse := lnrpc.ChannelAcceptResponse{ - Accept: true, - PendingChanId: newRequest.chanReq.OpenChanMsg.PendingChannelID[:], - } - - newRequest.responseChan <- newResponse - case <-errChan: - t.Fatalf("unable to accept ChannelAcceptRequest") - case <-successChan: - return - case <-quit: + case <-responses: + case <-time.After(testTimeout): + c.t.Fatalf("did not receive response") } } } + +// TestMultipleAcceptClients tests that the RPC acceptor is capable of handling +// multiple requests to its Accept function and responding to them correctly. +func TestMultipleAcceptClients(t *testing.T) { + var ( + chan1 = &lnwire.OpenChannel{ + PendingChannelID: [32]byte{1}, + } + chan2 = &lnwire.OpenChannel{ + PendingChannelID: [32]byte{2}, + } + chan3 = &lnwire.OpenChannel{ + PendingChannelID: [32]byte{3}, + } + + // Queries is a map of the channel IDs we will query Accept + // with, and the set of outcomes we expect. + queries = map[*lnwire.OpenChannel]bool{ + chan1: true, + chan2: false, + chan3: false, + } + + // Responses is a mocked set of responses from the remote + // channel acceptor. + responses = map[[32]byte]*lnrpc.ChannelAcceptResponse{ + chan1.PendingChannelID: { + PendingChanId: chan1.PendingChannelID[:], + Accept: true, + }, + chan2.PendingChannelID: { + PendingChanId: chan2.PendingChannelID[:], + Accept: false, + }, + chan3.PendingChannelID: { + PendingChanId: chan3.PendingChannelID[:], + Accept: false, + }, + } + ) + + // Create and start our channel acceptor. + testCtx := newChanAcceptorCtx(t, len(queries), responses) + testCtx.start() + + // Dispatch three queries and assert that we get our expected response. + // for each. + testCtx.queryAndAssert(queries) + + // Shutdown our acceptor. + testCtx.stop() +} diff --git a/chanacceptor/log.go b/chanacceptor/log.go new file mode 100644 index 00000000..26e131c4 --- /dev/null +++ b/chanacceptor/log.go @@ -0,0 +1,32 @@ +package chanacceptor + +import ( + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/build" +) + +// Subsystem defines the logging code for this subsystem. +const Subsystem = "CHAC" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger(Subsystem, nil)) +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until UseLogger is called. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/chanacceptor/rpcacceptor.go b/chanacceptor/rpcacceptor.go index 9c4401d7..360db970 100644 --- a/chanacceptor/rpcacceptor.go +++ b/chanacceptor/rpcacceptor.go @@ -1,24 +1,255 @@ package chanacceptor +import ( + "errors" + "sync" + "time" + + "github.com/lightningnetwork/lnd/lnrpc" +) + +var errShuttingDown = errors.New("server shutting down") + +// chanAcceptInfo contains a request for a channel acceptor decision, and a +// channel that the response should be sent on. +type chanAcceptInfo struct { + request *ChannelAcceptRequest + response chan bool +} + // RPCAcceptor represents the RPC-controlled variant of the ChannelAcceptor. // One RPCAcceptor allows one RPC client. type RPCAcceptor struct { - acceptClosure func(req *ChannelAcceptRequest) bool + // receive is a function from which we receive channel acceptance + // decisions. Note that this function is expected to block. + receive func() (*lnrpc.ChannelAcceptResponse, error) + + // send is a function which sends requests for channel acceptance + // decisions into our rpc stream. + send func(request *lnrpc.ChannelAcceptRequest) error + + // requests is a channel that we send requests for a acceptor response + // into. + requests chan *chanAcceptInfo + + // timeout is the amount of time we allow the channel acceptance + // decision to take. This time includes the time to send a query to the + // acceptor, and the time it takes to receive a response. + timeout time.Duration + + // done is closed when the rpc client terminates. + done chan struct{} + + // quit is closed when lnd is shutting down. + quit chan struct{} + + wg sync.WaitGroup } // Accept is a predicate on the ChannelAcceptRequest which is sent to the RPC -// client who will respond with the ultimate decision. This assumes an accept -// closure has been specified during creation. +// client who will respond with the ultimate decision. This function passes the +// request into the acceptor's requests channel, and returns the response it +// receives, failing the request if the timeout elapses. // // NOTE: Part of the ChannelAcceptor interface. func (r *RPCAcceptor) Accept(req *ChannelAcceptRequest) bool { - return r.acceptClosure(req) + respChan := make(chan bool, 1) + + newRequest := &chanAcceptInfo{ + request: req, + response: respChan, + } + + // timeout is the time after which ChannelAcceptRequests expire. + timeout := time.After(r.timeout) + + // Send the request to the newRequests channel. + select { + case r.requests <- newRequest: + + case <-timeout: + log.Errorf("RPCAcceptor returned false - reached timeout of %v", + r.timeout) + return false + + case <-r.done: + return false + + case <-r.quit: + return false + } + + // Receive the response and return it. If no response has been received + // in AcceptorTimeout, then return false. + select { + case resp := <-respChan: + return resp + + case <-timeout: + log.Errorf("RPCAcceptor returned false - reached timeout of %v", + r.timeout) + return false + + case <-r.done: + return false + + case <-r.quit: + return false + } } // NewRPCAcceptor creates and returns an instance of the RPCAcceptor. -func NewRPCAcceptor(closure func(*ChannelAcceptRequest) bool) *RPCAcceptor { +func NewRPCAcceptor(receive func() (*lnrpc.ChannelAcceptResponse, error), + send func(*lnrpc.ChannelAcceptRequest) error, + timeout time.Duration, quit chan struct{}) *RPCAcceptor { + return &RPCAcceptor{ - acceptClosure: closure, + receive: receive, + send: send, + requests: make(chan *chanAcceptInfo), + timeout: timeout, + done: make(chan struct{}), + quit: quit, + } +} + +// Run is the main loop for the RPC Acceptor. This function will block until +// it receives the signal that lnd is shutting down, or the rpc stream is +// cancelled by the client. +func (r *RPCAcceptor) Run() error { + // Wait for our goroutines to exit before we return. + defer r.wg.Wait() + + // Create a channel that responses from acceptors are sent into. + responses := make(chan lnrpc.ChannelAcceptResponse) + + // errChan is used by the receive loop to signal any errors that occur + // during reading from the stream. This is primarily used to shutdown + // the send loop in the case of an RPC client disconnecting. + errChan := make(chan error, 1) + + // Start a goroutine to receive responses from the channel acceptor. + // We expect the receive function to block, so it must be run in a + // goroutine (otherwise we could not send more than one channel accept + // request to the client). + r.wg.Add(1) + go func() { + r.receiveResponses(errChan, responses) + r.wg.Done() + }() + + return r.sendAcceptRequests(errChan, responses) +} + +// receiveResponses receives responses for our channel accept requests and +// dispatches them into the responses channel provided, sending any errors that +// occur into the error channel provided. +func (r *RPCAcceptor) receiveResponses(errChan chan error, + responses chan lnrpc.ChannelAcceptResponse) { + + for { + resp, err := r.receive() + if err != nil { + errChan <- err + return + } + + var pendingID [32]byte + copy(pendingID[:], resp.PendingChanId) + + openChanResp := lnrpc.ChannelAcceptResponse{ + Accept: resp.Accept, + PendingChanId: pendingID[:], + } + + // We have received a decision for one of our channel + // acceptor requests. + select { + case responses <- openChanResp: + + case <-r.done: + return + + case <-r.quit: + return + } + } +} + +// sendAcceptRequests handles channel acceptor requests sent to us by our +// Accept() function, dispatching them to our acceptor stream and coordinating +// return of responses to their callers. +func (r *RPCAcceptor) sendAcceptRequests(errChan chan error, + responses chan lnrpc.ChannelAcceptResponse) error { + + // Close the done channel to indicate that the acceptor is no longer + // listening and any in-progress requests should be terminated. + defer close(r.done) + + acceptRequests := make(map[[32]byte]chan bool) + + for { + select { + // Consume requests passed to us from our Accept() function and + // send them into our stream. + case newRequest := <-r.requests: + + req := newRequest.request + pendingChanID := req.OpenChanMsg.PendingChannelID + + acceptRequests[pendingChanID] = newRequest.response + + // A ChannelAcceptRequest has been received, send it to the client. + chanAcceptReq := &lnrpc.ChannelAcceptRequest{ + NodePubkey: req.Node.SerializeCompressed(), + ChainHash: req.OpenChanMsg.ChainHash[:], + PendingChanId: req.OpenChanMsg.PendingChannelID[:], + FundingAmt: uint64(req.OpenChanMsg.FundingAmount), + PushAmt: uint64(req.OpenChanMsg.PushAmount), + DustLimit: uint64(req.OpenChanMsg.DustLimit), + MaxValueInFlight: uint64(req.OpenChanMsg.MaxValueInFlight), + ChannelReserve: uint64(req.OpenChanMsg.ChannelReserve), + MinHtlc: uint64(req.OpenChanMsg.HtlcMinimum), + FeePerKw: uint64(req.OpenChanMsg.FeePerKiloWeight), + CsvDelay: uint32(req.OpenChanMsg.CsvDelay), + MaxAcceptedHtlcs: uint32(req.OpenChanMsg.MaxAcceptedHTLCs), + ChannelFlags: uint32(req.OpenChanMsg.ChannelFlags), + } + + if err := r.send(chanAcceptReq); err != nil { + return err + } + + // Process newly received responses from our channel acceptor, + // looking the original request up in our map of requests and + // dispatching the response. + case resp := <-responses: + // Look up the appropriate channel to send on given the + // pending ID. If a channel is found, send the response + // over it. + var pendingID [32]byte + copy(pendingID[:], resp.PendingChanId) + respChan, ok := acceptRequests[pendingID] + if !ok { + continue + } + + // Send the response boolean over the buffered response + // channel. + respChan <- resp.Accept + + // Delete the channel from the acceptRequests map. + delete(acceptRequests, pendingID) + + // If we failed to receive from our acceptor, we exit. + case err := <-errChan: + log.Errorf("Received an error: %v, shutting down", err) + return err + + // Exit if we are shutting down. + case <-r.quit: + return errShuttingDown + } } } diff --git a/log.go b/log.go index e618fd7d..5da6c9ad 100644 --- a/log.go +++ b/log.go @@ -11,6 +11,7 @@ import ( "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainreg" + "github.com/lightningnetwork/lnd/chanacceptor" "github.com/lightningnetwork/lnd/chanbackup" "github.com/lightningnetwork/lnd/chanfitness" "github.com/lightningnetwork/lnd/channeldb" @@ -133,6 +134,7 @@ func SetupLoggers(root *build.RotatingLogWriter) { AddSubLogger(root, verrpc.Subsystem, verrpc.UseLogger) AddSubLogger(root, healthcheck.Subsystem, healthcheck.UseLogger) AddSubLogger(root, chainreg.Subsystem, chainreg.UseLogger) + AddSubLogger(root, chanacceptor.Subsystem, chanacceptor.UseLogger) } // AddSubLogger is a helper method to conveniently create and register the diff --git a/rpcserver.go b/rpcserver.go index 8d39547f..57706243 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -6399,14 +6399,6 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription } } -// chanAcceptInfo is used in the ChannelAcceptor bidirectional stream and -// encapsulates the request information sent from the RPCAcceptor to the -// RPCServer. -type chanAcceptInfo struct { - chanReq *chanacceptor.ChannelAcceptRequest - responseChan chan bool -} - // ChannelAcceptor dispatches a bi-directional streaming RPC in which // OpenChannel requests are sent to the client and the client responds with // a boolean that tells LND whether or not to accept the channel. This allows @@ -6415,153 +6407,22 @@ type chanAcceptInfo struct { func (r *rpcServer) ChannelAcceptor(stream lnrpc.Lightning_ChannelAcceptorServer) error { chainedAcceptor := r.chanPredicate - // Create two channels to handle requests and responses respectively. - newRequests := make(chan *chanAcceptInfo) - responses := make(chan lnrpc.ChannelAcceptResponse) - - // Define a quit channel that will be used to signal to the RPCAcceptor's - // closure whether the stream still exists. - quit := make(chan struct{}) - defer close(quit) - - // demultiplexReq is a closure that will be passed to the RPCAcceptor and - // acts as an intermediary between the RPCAcceptor and the RPCServer. - demultiplexReq := func(req *chanacceptor.ChannelAcceptRequest) bool { - respChan := make(chan bool, 1) - - newRequest := &chanAcceptInfo{ - chanReq: req, - responseChan: respChan, - } - - // timeout is the time after which ChannelAcceptRequests expire. - timeout := time.After(r.cfg.AcceptorTimeout) - - // Send the request to the newRequests channel. - select { - case newRequests <- newRequest: - case <-timeout: - rpcsLog.Errorf("RPCAcceptor returned false - reached timeout of %d", - r.cfg.AcceptorTimeout) - return false - case <-quit: - return false - case <-r.quit: - return false - } - - // Receive the response and return it. If no response has been received - // in AcceptorTimeout, then return false. - select { - case resp := <-respChan: - return resp - case <-timeout: - rpcsLog.Errorf("RPCAcceptor returned false - reached timeout of %d", - r.cfg.AcceptorTimeout) - return false - case <-quit: - return false - case <-r.quit: - return false - } - } - - // Create a new RPCAcceptor via the NewRPCAcceptor method. - rpcAcceptor := chanacceptor.NewRPCAcceptor(demultiplexReq) + // Create a new RPCAcceptor which will send requests into the + // newRequests channel when it receives them. + rpcAcceptor := chanacceptor.NewRPCAcceptor( + stream.Recv, stream.Send, r.cfg.AcceptorTimeout, r.quit, + ) // Add the RPCAcceptor to the ChainedAcceptor and defer its removal. id := chainedAcceptor.AddAcceptor(rpcAcceptor) defer chainedAcceptor.RemoveAcceptor(id) - // errChan is used by the receive loop to signal any errors that occur - // during reading from the stream. This is primarily used to shutdown the - // send loop in the case of an RPC client disconnecting. - errChan := make(chan error, 1) - - // We need to have the stream.Recv() in a goroutine since the call is - // blocking and would prevent us from sending more ChannelAcceptRequests to - // the RPC client. - go func() { - for { - resp, err := stream.Recv() - if err != nil { - errChan <- err - return - } - - var pendingID [32]byte - copy(pendingID[:], resp.PendingChanId) - - openChanResp := lnrpc.ChannelAcceptResponse{ - Accept: resp.Accept, - PendingChanId: pendingID[:], - } - - // Now that we have the response from the RPC client, send it to - // the responses chan. - select { - case responses <- openChanResp: - case <-quit: - return - case <-r.quit: - return - } - } - }() - - acceptRequests := make(map[[32]byte]chan bool) - - for { - select { - case newRequest := <-newRequests: - - req := newRequest.chanReq - pendingChanID := req.OpenChanMsg.PendingChannelID - - acceptRequests[pendingChanID] = newRequest.responseChan - - // A ChannelAcceptRequest has been received, send it to the client. - chanAcceptReq := &lnrpc.ChannelAcceptRequest{ - NodePubkey: req.Node.SerializeCompressed(), - ChainHash: req.OpenChanMsg.ChainHash[:], - PendingChanId: req.OpenChanMsg.PendingChannelID[:], - FundingAmt: uint64(req.OpenChanMsg.FundingAmount), - PushAmt: uint64(req.OpenChanMsg.PushAmount), - DustLimit: uint64(req.OpenChanMsg.DustLimit), - MaxValueInFlight: uint64(req.OpenChanMsg.MaxValueInFlight), - ChannelReserve: uint64(req.OpenChanMsg.ChannelReserve), - MinHtlc: uint64(req.OpenChanMsg.HtlcMinimum), - FeePerKw: uint64(req.OpenChanMsg.FeePerKiloWeight), - CsvDelay: uint32(req.OpenChanMsg.CsvDelay), - MaxAcceptedHtlcs: uint32(req.OpenChanMsg.MaxAcceptedHTLCs), - ChannelFlags: uint32(req.OpenChanMsg.ChannelFlags), - } - - if err := stream.Send(chanAcceptReq); err != nil { - return err - } - case resp := <-responses: - // Look up the appropriate channel to send on given the pending ID. - // If a channel is found, send the response over it. - var pendingID [32]byte - copy(pendingID[:], resp.PendingChanId) - respChan, ok := acceptRequests[pendingID] - if !ok { - continue - } - - // Send the response boolean over the buffered response channel. - respChan <- resp.Accept - - // Delete the channel from the acceptRequests map. - delete(acceptRequests, pendingID) - case err := <-errChan: - rpcsLog.Errorf("Received an error: %v, shutting down", err) - return err - case <-r.quit: - return fmt.Errorf("RPC server is shutting down") - } - } + // Run the rpc acceptor, which will accept requests for channel + // acceptance decisions from our chained acceptor, send them to the + // channel acceptor and listen for and report responses. This function + // blocks, and will exit if the rpcserver receives the instruction to + // shutdown, or the client cancels. + return rpcAcceptor.Run() } // BakeMacaroon allows the creation of a new macaroon with custom read and write