diff --git a/peer.go b/peer.go index 927247d1..d360749b 100644 --- a/peer.go +++ b/peer.go @@ -190,67 +190,9 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, quit: make(chan struct{}), } - // Fetch and then load all the active channels we have with this - // remote peer from the database. - activeChans, err := server.chanDB.FetchOpenChannels(p.addr.IdentityKey) - if err != nil { - peerLog.Errorf("unable to fetch active chans "+ - "for peer %v: %v", p, err) - return nil, err - } - peerLog.Debugf("Loaded %v active channels from database with peerID(%v)", - len(activeChans), p.id) - if err := p.loadActiveChannels(activeChans); err != nil { - return nil, err - } - return p, nil } -// loadActiveChannels creates indexes within the peer for tracking all active -// channels returned by the database. -func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { - for _, dbChan := range chans { - if dbChan.IsPending { - continue - } - - lnChan, err := lnwallet.NewLightningChannel(p.server.lnwallet.Signer, - p.server.chainNotifier, dbChan) - if err != nil { - return err - } - - chanPoint := *dbChan.ChanID - chanID := lnwire.NewChanIDFromOutPoint(&chanPoint) - - p.activeChanMtx.Lock() - p.activeChannels[chanID] = lnChan - p.activeChanMtx.Unlock() - - peerLog.Infof("peerID(%v) loaded ChannelPoint(%v)", p.id, chanPoint) - - p.server.breachArbiter.newContracts <- lnChan - - // Register this new channel link with the HTLC Switch. This is - // necessary to properly route multi-hop payments, and forward - // new payments triggered by RPC clients. - downstreamLink := make(chan *htlcPacket, 10) - plexChan := p.server.htlcSwitch.RegisterLink(p, - dbChan.Snapshot(), downstreamLink) - - upstreamLink := make(chan lnwire.Message, 10) - p.htlcManMtx.Lock() - p.htlcManagers[chanID] = upstreamLink - p.htlcManMtx.Unlock() - - p.wg.Add(1) - go p.htlcManager(lnChan, plexChan, downstreamLink, upstreamLink) - } - - return nil -} - // Start starts all helper goroutines the peer needs for normal operations. In // the case this peer has already been started, then this function is a loop. func (p *peer) Start() error { @@ -310,6 +252,24 @@ func (p *peer) Start() error { go p.channelManager() go p.pingHandler() + // Fetch and then load all the active channels we have with this remote + // peer from the database. + activeChans, err := p.server.chanDB.FetchOpenChannels(p.addr.IdentityKey) + if err != nil { + peerLog.Errorf("unable to fetch active chans "+ + "for peer %v: %v", p, err) + return err + } + + // Next, load all the active channels we have with this peer, + // registering them with the switch and launching the necessary + // goroutines required to operate them. + peerLog.Debugf("Loaded %v active channels from database with "+ + "peerID(%v)", len(activeChans), p.id) + if err := p.loadActiveChannels(activeChans); err != nil { + return fmt.Errorf("unable to load channels: %v", err) + } + return nil } @@ -321,13 +281,53 @@ func (p *peer) Stop() error { if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) { return nil } +// loadActiveChannels creates indexes within the peer for tracking all active +// channels returned by the database. +func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { + for _, dbChan := range chans { + // If the channel isn't yet open, then we don't need to process + // it any further. + if dbChan.IsPending { + continue + } // Ensure that the TCP connection is properly closed before continuing. p.conn.Close() + lnChan, err := lnwallet.NewLightningChannel(p.server.lnwallet.Signer, + p.server.chainNotifier, dbChan) + if err != nil { + return err + } // Signal all worker goroutines to gracefully exit. close(p.quit) p.wg.Wait() + chanPoint := *dbChan.ChanID + chanID := lnwire.NewChanIDFromOutPoint(&chanPoint) + + p.activeChanMtx.Lock() + p.activeChannels[chanID] = lnChan + p.activeChanMtx.Unlock() + + peerLog.Infof("peerID(%v) loaded ChannelPoint(%v)", p.id, chanPoint) + + p.server.breachArbiter.newContracts <- lnChan + + // Register this new channel link with the HTLC Switch. This is + // necessary to properly route multi-hop payments, and forward + // new payments triggered by RPC clients. + downstreamLink := make(chan *htlcPacket, 10) + plexChan := p.server.htlcSwitch.RegisterLink(p, + dbChan.Snapshot(), downstreamLink) + + upstreamLink := make(chan lnwire.Message, 10) + p.htlcManMtx.Lock() + p.htlcManagers[chanID] = upstreamLink + p.htlcManMtx.Unlock() + + p.wg.Add(1) + go p.htlcManager(lnChan, plexChan, downstreamLink, upstreamLink) + } return nil }