From 98f078aeb508a87e67da50862732e3b88fbc460e Mon Sep 17 00:00:00 2001 From: Shai Erera Date: Wed, 23 Dec 2015 12:16:50 +0000 Subject: [PATCH] SOLR-8455: Improve RecoveryStrategy logging and fix interval-between-recovery-attempts git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1721531 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 3 + .../apache/solr/cloud/RecoveryStrategy.java | 204 ++++++++---------- 2 files changed, 97 insertions(+), 110 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 1b74af12051..7adc7963fe4 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -378,6 +378,9 @@ Other Changes * SOLR-8454: ZkStateReader logging improvements and cleanup of dead code (Shai Erera, Anshum Gupta) +* SOLR-8455: RecovertStrategy logging improvements and sleep-between-recovery-attempts bug fix. + (Shai Erera) + ================== 5.4.0 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 85066584c45..aae53c72ea9 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -17,6 +17,8 @@ package org.apache.solr.cloud; * limitations under the License. */ +import java.io.Closeable; + import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; @@ -37,7 +39,6 @@ import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; -import org.apache.solr.common.cloud.ClosableThread; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkCoreNodeProps; @@ -66,15 +67,16 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RecoveryStrategy extends Thread implements ClosableThread { +public class RecoveryStrategy extends Thread implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final int WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 7000); private static final int MAX_RETRIES = 500; private static final int STARTING_RECOVERY_DELAY = 5000; private static final String REPLICATION_HANDLER = "/replication"; - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - public static interface RecoveryListener { public void recovered(); public void failed(); @@ -113,19 +115,16 @@ public class RecoveryStrategy extends Thread implements ClosableThread { @Override public void close() { close = true; - try { + if (prevSendPreRecoveryHttpUriRequest != null) { prevSendPreRecoveryHttpUriRequest.abort(); - } catch (NullPointerException e) { - // okay } - log.warn("Stopping recovery for core={} coreNodeName={}", coreName, coreZkNodeName); + LOG.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName); } - private void recoveryFailed(final SolrCore core, final ZkController zkController, final String baseUrl, final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException { - SolrException.log(log, "Recovery failed - I give up."); + SolrException.log(LOG, "Recovery failed - I give up."); try { zkController.publish(cd, Replica.State.RECOVERY_FAILED); } finally { @@ -140,7 +139,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops); String leaderUrl = leaderCNodeProps.getCoreUrl(); - log.info("Attempting to replicate from " + leaderUrl + "."); + LOG.info("Attempting to replicate from [{}].", leaderUrl); // send commit commitOnLeader(leaderUrl); @@ -165,14 +164,14 @@ public class RecoveryStrategy extends Thread implements ClosableThread { } // solrcloud_debug - if (log.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { try { RefCounted searchHolder = core .getNewestSearcher(false); SolrIndexSearcher searcher = searchHolder.get(); Directory dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.META_DATA, null); try { - log.debug(core.getCoreDescriptor().getCoreContainer() + LOG.debug(core.getCoreDescriptor().getCoreContainer() .getZkController().getNodeName() + " replicated " + searcher.search(new MatchAllDocsQuery(), 1).totalHits @@ -189,7 +188,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { searchHolder.decref(); } } catch (Exception e) { - log.debug("Error in solrcloud_debug block", e); + LOG.debug("Error in solrcloud_debug block", e); } } @@ -215,21 +214,21 @@ public class RecoveryStrategy extends Thread implements ClosableThread { try (SolrCore core = cc.getCore(coreName)) { if (core == null) { - SolrException.log(log, "SolrCore not found - cannot recover:" + coreName); + SolrException.log(LOG, "SolrCore not found - cannot recover:" + coreName); return; } MDCLoggingContext.setCore(core); - log.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup); + LOG.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup); try { doRecovery(core); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - SolrException.log(log, "", e); + SolrException.log(LOG, "", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } catch (Exception e) { - log.error("", e); + LOG.error("", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } } finally { @@ -245,7 +244,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { UpdateLog ulog; ulog = core.getUpdateHandler().getUpdateLog(); if (ulog == null) { - SolrException.log(log, "No UpdateLog found - cannot recover."); + SolrException.log(LOG, "No UpdateLog found - cannot recover."); recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor()); return; @@ -257,7 +256,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) { recentVersions = recentUpdates.getVersions(ulog.getNumRecordsToKeep()); } catch (Exception e) { - SolrException.log(log, "Corrupt tlog - ignoring.", e); + SolrException.log(LOG, "Corrupt tlog - ignoring.", e); recentVersions = new ArrayList<>(0); } @@ -265,24 +264,21 @@ public class RecoveryStrategy extends Thread implements ClosableThread { if (startingVersions != null && recoveringAfterStartup) { try { - int oldIdx = 0; // index of the start of the old list in the current - // list - long firstStartingVersion = startingVersions.size() > 0 ? startingVersions - .get(0) : 0; + int oldIdx = 0; // index of the start of the old list in the current list + long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0; for (; oldIdx < recentVersions.size(); oldIdx++) { if (recentVersions.get(oldIdx) == firstStartingVersion) break; } if (oldIdx > 0) { - log.info("####### Found new versions added after startup: num=" - + oldIdx); - log.info("###### currentVersions=" + recentVersions); + LOG.info("####### Found new versions added after startup: num=[{}]", oldIdx); + LOG.info("###### currentVersions=[{}]",recentVersions); } - log.info("###### startupVersions=" + startingVersions); + LOG.info("###### startupVersions=[{}]", startingVersions); } catch (Exception e) { - SolrException.log(log, "Error getting recent versions.", e); + SolrException.log(LOG, "Error getting recent versions.", e); recentVersions = new ArrayList<>(0); } } @@ -297,11 +293,11 @@ public class RecoveryStrategy extends Thread implements ClosableThread { // this means we were previously doing a full index replication // that probably didn't complete and buffering updates in the // meantime. - log.info("Looks like a previous replication recovery did not complete - skipping peer sync."); + LOG.info("Looks like a previous replication recovery did not complete - skipping peer sync."); firstTime = false; // skip peersync } } catch (Exception e) { - SolrException.log(log, "Error trying to get ulog starting operation.", e); + SolrException.log(LOG, "Error trying to get ulog starting operation.", e); firstTime = false; // skip peersync } } @@ -309,8 +305,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { Future replayFuture = null; while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though try { - CloudDescriptor cloudDesc = core.getCoreDescriptor() - .getCloudDescriptor(); + CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor(); ZkNodeProps leaderprops = zkStateReader.getLeaderRetry( cloudDesc.getCollectionName(), cloudDesc.getShardId()); @@ -327,23 +322,24 @@ public class RecoveryStrategy extends Thread implements ClosableThread { } if (cloudDesc.isLeader()) { // we are now the leader - no one else must have been suitable - log.warn("We have not yet recovered - but we are now the leader!"); - log.info("Finished recovery process."); + LOG.warn("We have not yet recovered - but we are now the leader!"); + LOG.info("Finished recovery process."); zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE); return; } - log.info("Begin buffering updates. core=" + coreName); + LOG.info("Begin buffering updates. core=[{}]", coreName); ulog.bufferUpdates(); replayed = false; - log.info("Publishing state of core " + core.getName() + " as recovering, leader is " + leaderUrl + " and I am " - + ourUrl); + LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl, + ourUrl); zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING); - final Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(), cloudDesc.getShardId()); - + final Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(), + cloudDesc.getShardId()); + try { prevSendPreRecoveryHttpUriRequest.abort(); } catch (NullPointerException e) { @@ -351,14 +347,14 @@ public class RecoveryStrategy extends Thread implements ClosableThread { } if (isClosed()) { - log.info("RecoveryStrategy has been closed"); + LOG.info("RecoveryStrategy has been closed"); break; } sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice); if (isClosed()) { - log.info("RecoveryStrategy has been closed"); + LOG.info("RecoveryStrategy has been closed"); break; } @@ -375,7 +371,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { // first thing we just try to sync if (firstTime) { firstTime = false; // only try sync the first time through the loop - log.info("Attempting to PeerSync from " + leaderUrl + " - recoveringAfterStartup="+recoveringAfterStartup); + LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leaderUrl, recoveringAfterStartup); // System.out.println("Attempting to PeerSync from " + leaderUrl // + " i am:" + zkController.getNodeName()); PeerSync peerSync = new PeerSync(core, @@ -387,27 +383,12 @@ public class RecoveryStrategy extends Thread implements ClosableThread { new ModifiableSolrParams()); // force open a new searcher core.getUpdateHandler().commit(new CommitUpdateCommand(req, false)); - log.info("PeerSync stage of recovery was successful."); + LOG.info("PeerSync stage of recovery was successful."); // solrcloud_debug - if (log.isDebugEnabled()) { - try { - RefCounted searchHolder = core - .getNewestSearcher(false); - SolrIndexSearcher searcher = searchHolder.get(); - try { - log.debug(core.getCoreDescriptor() - .getCoreContainer().getZkController().getNodeName() - + " synched " - + searcher.search(new MatchAllDocsQuery(), 1).totalHits); - } finally { - searchHolder.decref(); - } - } catch (Exception e) { - log.debug("Error in solrcloud_debug block", e); - } - } - log.info("Replaying updates buffered during PeerSync."); + cloudDebugLog(core, "synced"); + + LOG.info("Replaying updates buffered during PeerSync."); replay(core); replayed = true; @@ -416,22 +397,22 @@ public class RecoveryStrategy extends Thread implements ClosableThread { return; } - log.info("PeerSync Recovery was not successful - trying replication."); + LOG.info("PeerSync Recovery was not successful - trying replication."); } if (isClosed()) { - log.info("RecoveryStrategy has been closed"); + LOG.info("RecoveryStrategy has been closed"); break; } - log.info("Starting Replication Recovery."); + LOG.info("Starting Replication Recovery."); try { replicate(zkController.getNodeName(), core, leaderprops); if (isClosed()) { - log.info("RecoveryStrategy has been closed"); + LOG.info("RecoveryStrategy has been closed"); break; } @@ -439,29 +420,29 @@ public class RecoveryStrategy extends Thread implements ClosableThread { replayed = true; if (isClosed()) { - log.info("RecoveryStrategy has been closed"); + LOG.info("RecoveryStrategy has been closed"); break; } - log.info("Replication Recovery was successful."); + LOG.info("Replication Recovery was successful."); successfulRecovery = true; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - log.warn("Recovery was interrupted", e); + LOG.warn("Recovery was interrupted", e); close = true; } catch (Exception e) { - SolrException.log(log, "Error while trying to recover", e); + SolrException.log(LOG, "Error while trying to recover", e); } } catch (Exception e) { - SolrException.log(log, "Error while trying to recover. core=" + coreName, e); + SolrException.log(LOG, "Error while trying to recover. core=" + coreName, e); } finally { if (!replayed) { // dropBufferedUpdate()s currently only supports returning to ACTIVE state, which risks additional updates // being added w/o UpdateLog.FLAG_GAP, hence losing the info on restart that we are not up-to-date. // For now, ulog will simply remain in BUFFERING state, and an additional call to bufferUpdates() will // reset our starting point for playback. - log.info("Replay not started, or was not successful... still buffering updates."); + LOG.info("Replay not started, or was not successful... still buffering updates."); /** this prev code is retained in case we want to switch strategies. try { @@ -472,11 +453,11 @@ public class RecoveryStrategy extends Thread implements ClosableThread { **/ } if (successfulRecovery) { - log.info("Registering as Active after recovery."); + LOG.info("Registering as Active after recovery."); try { zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE); } catch (Exception e) { - log.error("Could not publish as ACTIVE after succesful recovery", e); + LOG.error("Could not publish as ACTIVE after succesful recovery", e); successfulRecovery = false; } @@ -494,40 +475,43 @@ public class RecoveryStrategy extends Thread implements ClosableThread { try { if (isClosed()) { - log.info("RecoveryStrategy has been closed"); + LOG.info("RecoveryStrategy has been closed"); break; } - log.error("Recovery failed - trying again... (" + retries + ")"); + LOG.error("Recovery failed - trying again... (" + retries + ")"); retries++; if (retries >= MAX_RETRIES) { - SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ")."); + SolrException.log(LOG, "Recovery failed - max retries exceeded (" + retries + ")."); try { recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor()); } catch (Exception e) { - SolrException.log(log, "Could not publish that recovery failed", e); + SolrException.log(LOG, "Could not publish that recovery failed", e); } break; } } catch (Exception e) { - SolrException.log(log, "", e); + SolrException.log(LOG, "An error has occurred during recovery", e); } try { - // start at 1 sec and work up to a min - double loopCount = Math.min(Math.pow(2, retries), 60); - log.info("Wait {} seconds before trying to recover again ({})", loopCount, retries); + // Wait an exponential interval between retries, start at 5 seconds and work up to a minute. + // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result + // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in + // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m). + double loopCount = retries < 4 ? Math.min(Math.pow(2, retries), 12) : 12; + LOG.info("Wait [{}] seconds before trying to recover again (attempt={})", loopCount, retries); for (int i = 0; i < loopCount; i++) { if (isClosed()) { - log.info("RecoveryStrategy has been closed"); + LOG.info("RecoveryStrategy has been closed"); break; // check if someone closed us } Thread.sleep(STARTING_RECOVERY_DELAY); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - log.warn("Recovery was interrupted.", e); + LOG.warn("Recovery was interrupted.", e); close = true; } } @@ -537,13 +521,11 @@ public class RecoveryStrategy extends Thread implements ClosableThread { // if replay was skipped (possibly to due pulling a full index from the leader), // then we still need to update version bucket seeds after recovery if (successfulRecovery && replayFuture == null) { - log.info("Updating version bucket highest from index after successful recovery."); + LOG.info("Updating version bucket highest from index after successful recovery."); core.seedVersionBuckets(); } - log.info("Finished recovery process, successful=", Boolean.toString(successfulRecovery)); - - + LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery)); } private Future replay(SolrCore core) @@ -551,40 +533,42 @@ public class RecoveryStrategy extends Thread implements ClosableThread { Future future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates(); if (future == null) { // no replay needed\ - log.info("No replay needed."); + LOG.info("No replay needed."); } else { - log.info("Replaying buffered documents."); + LOG.info("Replaying buffered documents."); // wait for replay RecoveryInfo report = future.get(); if (report.failed) { - SolrException.log(log, "Replay failed"); + SolrException.log(LOG, "Replay failed"); throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed"); } } // solrcloud_debug - if (log.isDebugEnabled()) { - try { - RefCounted searchHolder = core - .getNewestSearcher(false); - SolrIndexSearcher searcher = searchHolder.get(); - try { - log.debug(core.getCoreDescriptor().getCoreContainer() - .getZkController().getNodeName() - + " replayed " - + searcher.search(new MatchAllDocsQuery(), 1).totalHits); - } finally { - searchHolder.decref(); - } - } catch (Exception e) { - log.debug("Error in solrcloud_debug block", e); - } - } + cloudDebugLog(core, "replayed"); return future; } + + private void cloudDebugLog(SolrCore core, String op) { + if (!LOG.isDebugEnabled()) { + return; + } + try { + RefCounted searchHolder = core.getNewestSearcher(false); + SolrIndexSearcher searcher = searchHolder.get(); + try { + final int totalHits = searcher.search(new MatchAllDocsQuery(), 1).totalHits; + final String nodeName = core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName(); + LOG.debug("[{}] {} [{} total hits]", nodeName, op, totalHits); + } finally { + searchHolder.decref(); + } + } catch (Exception e) { + LOG.debug("Error in solrcloud_debug block", e); + } + } - @Override public boolean isClosed() { return close; } @@ -608,7 +592,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd); prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest; - log.info("Sending prep recovery command to {}; {}", leaderBaseUrl, prepCmd.toString()); + LOG.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd.toString()); mrr.future.get(); }