SOLR-8455: Improve RecoveryStrategy logging and fix interval-between-recovery-attempts

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1721531 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shai Erera 2015-12-23 12:16:50 +00:00
parent d1623d09e5
commit 98f078aeb5
2 changed files with 97 additions and 110 deletions

View File

@ -378,6 +378,9 @@ Other Changes
* SOLR-8454: ZkStateReader logging improvements and cleanup of dead code (Shai Erera, Anshum Gupta)
* SOLR-8455: RecovertStrategy logging improvements and sleep-between-recovery-attempts bug fix.
(Shai Erera)
================== 5.4.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release

View File

@ -17,6 +17,8 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
@ -37,7 +39,6 @@ import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@ -66,15 +67,16 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RecoveryStrategy extends Thread implements ClosableThread {
public class RecoveryStrategy extends Thread implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 7000);
private static final int MAX_RETRIES = 500;
private static final int STARTING_RECOVERY_DELAY = 5000;
private static final String REPLICATION_HANDLER = "/replication";
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static interface RecoveryListener {
public void recovered();
public void failed();
@ -113,19 +115,16 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
@Override
public void close() {
close = true;
try {
if (prevSendPreRecoveryHttpUriRequest != null) {
prevSendPreRecoveryHttpUriRequest.abort();
} catch (NullPointerException e) {
// okay
}
log.warn("Stopping recovery for core={} coreNodeName={}", coreName, coreZkNodeName);
LOG.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName);
}
private void recoveryFailed(final SolrCore core,
final ZkController zkController, final String baseUrl,
final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException {
SolrException.log(log, "Recovery failed - I give up.");
SolrException.log(LOG, "Recovery failed - I give up.");
try {
zkController.publish(cd, Replica.State.RECOVERY_FAILED);
} finally {
@ -140,7 +139,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
String leaderUrl = leaderCNodeProps.getCoreUrl();
log.info("Attempting to replicate from " + leaderUrl + ".");
LOG.info("Attempting to replicate from [{}].", leaderUrl);
// send commit
commitOnLeader(leaderUrl);
@ -165,14 +164,14 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
}
// solrcloud_debug
if (log.isDebugEnabled()) {
if (LOG.isDebugEnabled()) {
try {
RefCounted<SolrIndexSearcher> searchHolder = core
.getNewestSearcher(false);
SolrIndexSearcher searcher = searchHolder.get();
Directory dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.META_DATA, null);
try {
log.debug(core.getCoreDescriptor().getCoreContainer()
LOG.debug(core.getCoreDescriptor().getCoreContainer()
.getZkController().getNodeName()
+ " replicated "
+ searcher.search(new MatchAllDocsQuery(), 1).totalHits
@ -189,7 +188,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
searchHolder.decref();
}
} catch (Exception e) {
log.debug("Error in solrcloud_debug block", e);
LOG.debug("Error in solrcloud_debug block", e);
}
}
@ -215,21 +214,21 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
SolrException.log(log, "SolrCore not found - cannot recover:" + coreName);
SolrException.log(LOG, "SolrCore not found - cannot recover:" + coreName);
return;
}
MDCLoggingContext.setCore(core);
log.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup);
LOG.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup);
try {
doRecovery(core);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
SolrException.log(log, "", e);
SolrException.log(LOG, "", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (Exception e) {
log.error("", e);
LOG.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
} finally {
@ -245,7 +244,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
UpdateLog ulog;
ulog = core.getUpdateHandler().getUpdateLog();
if (ulog == null) {
SolrException.log(log, "No UpdateLog found - cannot recover.");
SolrException.log(LOG, "No UpdateLog found - cannot recover.");
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
return;
@ -257,7 +256,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
recentVersions = recentUpdates.getVersions(ulog.getNumRecordsToKeep());
} catch (Exception e) {
SolrException.log(log, "Corrupt tlog - ignoring.", e);
SolrException.log(LOG, "Corrupt tlog - ignoring.", e);
recentVersions = new ArrayList<>(0);
}
@ -265,24 +264,21 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
if (startingVersions != null && recoveringAfterStartup) {
try {
int oldIdx = 0; // index of the start of the old list in the current
// list
long firstStartingVersion = startingVersions.size() > 0 ? startingVersions
.get(0) : 0;
int oldIdx = 0; // index of the start of the old list in the current list
long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;
for (; oldIdx < recentVersions.size(); oldIdx++) {
if (recentVersions.get(oldIdx) == firstStartingVersion) break;
}
if (oldIdx > 0) {
log.info("####### Found new versions added after startup: num="
+ oldIdx);
log.info("###### currentVersions=" + recentVersions);
LOG.info("####### Found new versions added after startup: num=[{}]", oldIdx);
LOG.info("###### currentVersions=[{}]",recentVersions);
}
log.info("###### startupVersions=" + startingVersions);
LOG.info("###### startupVersions=[{}]", startingVersions);
} catch (Exception e) {
SolrException.log(log, "Error getting recent versions.", e);
SolrException.log(LOG, "Error getting recent versions.", e);
recentVersions = new ArrayList<>(0);
}
}
@ -297,11 +293,11 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
// this means we were previously doing a full index replication
// that probably didn't complete and buffering updates in the
// meantime.
log.info("Looks like a previous replication recovery did not complete - skipping peer sync.");
LOG.info("Looks like a previous replication recovery did not complete - skipping peer sync.");
firstTime = false; // skip peersync
}
} catch (Exception e) {
SolrException.log(log, "Error trying to get ulog starting operation.", e);
SolrException.log(LOG, "Error trying to get ulog starting operation.", e);
firstTime = false; // skip peersync
}
}
@ -309,8 +305,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
Future<RecoveryInfo> replayFuture = null;
while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
try {
CloudDescriptor cloudDesc = core.getCoreDescriptor()
.getCloudDescriptor();
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
cloudDesc.getCollectionName(), cloudDesc.getShardId());
@ -327,23 +322,24 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
}
if (cloudDesc.isLeader()) {
// we are now the leader - no one else must have been suitable
log.warn("We have not yet recovered - but we are now the leader!");
log.info("Finished recovery process.");
LOG.warn("We have not yet recovered - but we are now the leader!");
LOG.info("Finished recovery process.");
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
return;
}
log.info("Begin buffering updates. core=" + coreName);
LOG.info("Begin buffering updates. core=[{}]", coreName);
ulog.bufferUpdates();
replayed = false;
log.info("Publishing state of core " + core.getName() + " as recovering, leader is " + leaderUrl + " and I am "
+ ourUrl);
LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
ourUrl);
zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
final Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(), cloudDesc.getShardId());
final Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(),
cloudDesc.getShardId());
try {
prevSendPreRecoveryHttpUriRequest.abort();
} catch (NullPointerException e) {
@ -351,14 +347,14 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
}
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
LOG.info("RecoveryStrategy has been closed");
break;
}
sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
LOG.info("RecoveryStrategy has been closed");
break;
}
@ -375,7 +371,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
// first thing we just try to sync
if (firstTime) {
firstTime = false; // only try sync the first time through the loop
log.info("Attempting to PeerSync from " + leaderUrl + " - recoveringAfterStartup="+recoveringAfterStartup);
LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leaderUrl, recoveringAfterStartup);
// System.out.println("Attempting to PeerSync from " + leaderUrl
// + " i am:" + zkController.getNodeName());
PeerSync peerSync = new PeerSync(core,
@ -387,27 +383,12 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
new ModifiableSolrParams());
// force open a new searcher
core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
log.info("PeerSync stage of recovery was successful.");
LOG.info("PeerSync stage of recovery was successful.");
// solrcloud_debug
if (log.isDebugEnabled()) {
try {
RefCounted<SolrIndexSearcher> searchHolder = core
.getNewestSearcher(false);
SolrIndexSearcher searcher = searchHolder.get();
try {
log.debug(core.getCoreDescriptor()
.getCoreContainer().getZkController().getNodeName()
+ " synched "
+ searcher.search(new MatchAllDocsQuery(), 1).totalHits);
} finally {
searchHolder.decref();
}
} catch (Exception e) {
log.debug("Error in solrcloud_debug block", e);
}
}
log.info("Replaying updates buffered during PeerSync.");
cloudDebugLog(core, "synced");
LOG.info("Replaying updates buffered during PeerSync.");
replay(core);
replayed = true;
@ -416,22 +397,22 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
return;
}
log.info("PeerSync Recovery was not successful - trying replication.");
LOG.info("PeerSync Recovery was not successful - trying replication.");
}
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
LOG.info("RecoveryStrategy has been closed");
break;
}
log.info("Starting Replication Recovery.");
LOG.info("Starting Replication Recovery.");
try {
replicate(zkController.getNodeName(), core, leaderprops);
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
LOG.info("RecoveryStrategy has been closed");
break;
}
@ -439,29 +420,29 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
replayed = true;
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
LOG.info("RecoveryStrategy has been closed");
break;
}
log.info("Replication Recovery was successful.");
LOG.info("Replication Recovery was successful.");
successfulRecovery = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Recovery was interrupted", e);
LOG.warn("Recovery was interrupted", e);
close = true;
} catch (Exception e) {
SolrException.log(log, "Error while trying to recover", e);
SolrException.log(LOG, "Error while trying to recover", e);
}
} catch (Exception e) {
SolrException.log(log, "Error while trying to recover. core=" + coreName, e);
SolrException.log(LOG, "Error while trying to recover. core=" + coreName, e);
} finally {
if (!replayed) {
// dropBufferedUpdate()s currently only supports returning to ACTIVE state, which risks additional updates
// being added w/o UpdateLog.FLAG_GAP, hence losing the info on restart that we are not up-to-date.
// For now, ulog will simply remain in BUFFERING state, and an additional call to bufferUpdates() will
// reset our starting point for playback.
log.info("Replay not started, or was not successful... still buffering updates.");
LOG.info("Replay not started, or was not successful... still buffering updates.");
/** this prev code is retained in case we want to switch strategies.
try {
@ -472,11 +453,11 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
**/
}
if (successfulRecovery) {
log.info("Registering as Active after recovery.");
LOG.info("Registering as Active after recovery.");
try {
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
} catch (Exception e) {
log.error("Could not publish as ACTIVE after succesful recovery", e);
LOG.error("Could not publish as ACTIVE after succesful recovery", e);
successfulRecovery = false;
}
@ -494,40 +475,43 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
try {
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
LOG.info("RecoveryStrategy has been closed");
break;
}
log.error("Recovery failed - trying again... (" + retries + ")");
LOG.error("Recovery failed - trying again... (" + retries + ")");
retries++;
if (retries >= MAX_RETRIES) {
SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ").");
SolrException.log(LOG, "Recovery failed - max retries exceeded (" + retries + ").");
try {
recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
} catch (Exception e) {
SolrException.log(log, "Could not publish that recovery failed", e);
SolrException.log(LOG, "Could not publish that recovery failed", e);
}
break;
}
} catch (Exception e) {
SolrException.log(log, "", e);
SolrException.log(LOG, "An error has occurred during recovery", e);
}
try {
// start at 1 sec and work up to a min
double loopCount = Math.min(Math.pow(2, retries), 60);
log.info("Wait {} seconds before trying to recover again ({})", loopCount, retries);
// Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
// If we're at attempt >= 4, there's no point computing pow(2, retries) because the result
// will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
// order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
double loopCount = retries < 4 ? Math.min(Math.pow(2, retries), 12) : 12;
LOG.info("Wait [{}] seconds before trying to recover again (attempt={})", loopCount, retries);
for (int i = 0; i < loopCount; i++) {
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
LOG.info("RecoveryStrategy has been closed");
break; // check if someone closed us
}
Thread.sleep(STARTING_RECOVERY_DELAY);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Recovery was interrupted.", e);
LOG.warn("Recovery was interrupted.", e);
close = true;
}
}
@ -537,13 +521,11 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
// if replay was skipped (possibly to due pulling a full index from the leader),
// then we still need to update version bucket seeds after recovery
if (successfulRecovery && replayFuture == null) {
log.info("Updating version bucket highest from index after successful recovery.");
LOG.info("Updating version bucket highest from index after successful recovery.");
core.seedVersionBuckets();
}
log.info("Finished recovery process, successful=", Boolean.toString(successfulRecovery));
LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
}
private Future<RecoveryInfo> replay(SolrCore core)
@ -551,40 +533,42 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
if (future == null) {
// no replay needed\
log.info("No replay needed.");
LOG.info("No replay needed.");
} else {
log.info("Replaying buffered documents.");
LOG.info("Replaying buffered documents.");
// wait for replay
RecoveryInfo report = future.get();
if (report.failed) {
SolrException.log(log, "Replay failed");
SolrException.log(LOG, "Replay failed");
throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
}
}
// solrcloud_debug
if (log.isDebugEnabled()) {
try {
RefCounted<SolrIndexSearcher> searchHolder = core
.getNewestSearcher(false);
SolrIndexSearcher searcher = searchHolder.get();
try {
log.debug(core.getCoreDescriptor().getCoreContainer()
.getZkController().getNodeName()
+ " replayed "
+ searcher.search(new MatchAllDocsQuery(), 1).totalHits);
} finally {
searchHolder.decref();
}
} catch (Exception e) {
log.debug("Error in solrcloud_debug block", e);
}
}
cloudDebugLog(core, "replayed");
return future;
}
private void cloudDebugLog(SolrCore core, String op) {
if (!LOG.isDebugEnabled()) {
return;
}
try {
RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
SolrIndexSearcher searcher = searchHolder.get();
try {
final int totalHits = searcher.search(new MatchAllDocsQuery(), 1).totalHits;
final String nodeName = core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName();
LOG.debug("[{}] {} [{} total hits]", nodeName, op, totalHits);
} finally {
searchHolder.decref();
}
} catch (Exception e) {
LOG.debug("Error in solrcloud_debug block", e);
}
}
@Override
public boolean isClosed() {
return close;
}
@ -608,7 +592,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
log.info("Sending prep recovery command to {}; {}", leaderBaseUrl, prepCmd.toString());
LOG.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd.toString());
mrr.future.get();
}