SOLR-7033, SOLR-5961: RecoveryStrategy should not publish any state when closed / cancelled and there should always be a pause between recoveries even when recoveries are rapidly stopped and started as well as when a node attempts to become the leader for a shard.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1658236 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2015-02-08 18:31:07 +00:00
parent 908da87486
commit f2bf0e2235
7 changed files with 188 additions and 34 deletions

View File

@ -619,6 +619,12 @@ Bug Fixes
* SOLR-6920: A replicated index can end up corrupted when small files end up with the same
file name and size. (Varun Thacker, Mark Miller)
* SOLR-7033, SOLR-5961: RecoveryStrategy should not publish any state when
closed / cancelled and there should always be a pause between recoveries
even when recoveries are rapidly stopped and started as well as when a
node attempts to become the leader for a shard.
(Mark Miller, Maxim Novikov)
Optimizations
----------------------

View File

@ -0,0 +1,68 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// this class may be accessed by multiple threads, but only one at a time
public class ActionThrottle {
private static Logger log = LoggerFactory.getLogger(ActionThrottle.class);
private volatile long lastActionStartedAt;
private volatile long minMsBetweenActions;
private final String name;
public ActionThrottle(String name, long minMsBetweenActions) {
this.name = name;
this.minMsBetweenActions = minMsBetweenActions;
}
public void markAttemptingAction() {
lastActionStartedAt = System.nanoTime();
}
public void minimumWaitBetweenActions() {
if (lastActionStartedAt == 0) {
return;
}
long diff = System.nanoTime() - lastActionStartedAt;
int diffMs = (int) TimeUnit.MILLISECONDS.convert(diff, TimeUnit.NANOSECONDS);
long minNsBetweenActions = TimeUnit.NANOSECONDS.convert(minMsBetweenActions, TimeUnit.MILLISECONDS);
log.info("The last {} attempt started {}ms ago.", name, diffMs);
int sleep = 0;
if (diffMs > 0 && diff < minNsBetweenActions) {
sleep = (int) TimeUnit.MILLISECONDS.convert(minNsBetweenActions - diff, TimeUnit.NANOSECONDS);
} else if (diffMs == 0) {
sleep = (int) minMsBetweenActions;
}
if (sleep > 0) {
log.info("Throttling {} attempts - waiting for {}ms", name, sleep);
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

View File

@ -200,6 +200,21 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
log.info("Running the leader process for shard " + shardId);
String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
ActionThrottle lt;
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
cancelElection();
throw new SolrException(ErrorCode.SERVER_ERROR,
"SolrCore not found:" + coreName + " in "
+ cc.getCoreNames());
}
lt = core.getUpdateHandler().getSolrCoreState().getLeaderThrottle();
}
lt.minimumWaitBetweenActions();
lt.markAttemptingAction();
// clear the leader in clusterstate
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
@ -217,7 +232,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
if (core == null) {
cancelElection();
throw new SolrException(ErrorCode.SERVER_ERROR,
"Fatal Error, SolrCore not found:" + coreName + " in "
"SolrCore not found:" + coreName + " in "
+ cc.getCoreNames());
}

View File

@ -68,8 +68,7 @@ import java.util.concurrent.Future;
public class RecoveryStrategy extends Thread implements ClosableThread {
private static final int MAX_RETRIES = 500;
private static final int INTERRUPTED = MAX_RETRIES + 1;
private static final int STARTING_RECOVERY_DELAY = 1000;
private static final int STARTING_RECOVERY_DELAY = 5000;
private static final String REPLICATION_HANDLER = "/replication";
@ -93,6 +92,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
private CoreContainer cc;
private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
// this should only be used from SolrCoreState
public RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
this.cc = cc;
this.coreName = cd.getName();
@ -159,7 +159,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
if (isClosed()) retries = INTERRUPTED;
if (isClosed()) return; // we check closed on return
boolean success = replicationHandler.doFetch(solrParams, false);
if (!success) {
@ -233,12 +233,10 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
SolrException.log(log, "", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (Exception e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
} finally {
SolrRequestInfo.clearRequestInfo();
@ -465,7 +463,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Recovery was interrupted", e);
retries = INTERRUPTED;
close = true;
} catch (Exception e) {
SolrException.log(log, "Error while trying to recover", e);
} finally {
@ -489,38 +487,22 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
// Or do a fall off retry...
try {
log.error("Recovery failed - trying again... (" + retries + ") core=" + coreName);
if (isClosed()) {
retries = INTERRUPTED;
break;
}
log.error("Recovery failed - trying again... (" + retries + ") core=" + coreName);
retries++;
if (retries >= MAX_RETRIES) {
if (retries >= INTERRUPTED) {
SolrException.log(log, "Recovery failed - interrupted. core="
+ coreName);
try {
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
} catch (Exception e) {
SolrException.log(log,
"Could not publish that recovery failed", e);
}
} else {
SolrException.log(log,
"Recovery failed - max retries exceeded (" + retries + "). core=" + coreName);
try {
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
} catch (Exception e) {
SolrException.log(log,
"Could not publish that recovery failed", e);
}
SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + "). core=" + coreName);
try {
recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
} catch (Exception e) {
SolrException.log(log, "Could not publish that recovery failed", e);
}
break;
}
} catch (Exception e) {
SolrException.log(log, "core=" + coreName, e);
}
@ -536,7 +518,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Recovery was interrupted. core=" + coreName, e);
retries = INTERRUPTED;
close = true;
}
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.IndexWriter;
import org.apache.solr.cloud.RecoveryStrategy;
import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.core.CoreContainer;
@ -40,6 +41,10 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
private final Object recoveryLock = new Object();
private final ActionThrottle recoveryThrottle = new ActionThrottle("recovery", 10000);
private final ActionThrottle leaderThrottle = new ActionThrottle("leader", 5000);
// protects pauseWriter and writerFree
private final Object writerPauseLock = new Object();
@ -313,6 +318,9 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
// if true, we are recovering after startup and shouldn't have (or be receiving) additional updates (except for local tlog recovery)
boolean recoveringAfterStartup = recoveryStrat == null;
recoveryThrottle.minimumWaitBetweenActions();
recoveryThrottle.markAttemptingAction();
recoveryStrat = new RecoveryStrategy(cc, cd, this);
recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
recoveryStrat.start();
@ -364,4 +372,10 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
return commitLock;
}
@Override
public ActionThrottle getLeaderThrottle() {
return leaderThrottle;
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.concurrent.locks.Lock;
import org.apache.lucene.index.IndexWriter;
import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.DirectoryFactory;
@ -140,4 +141,9 @@ public abstract class SolrCoreState {
public abstract void close(IndexWriterCloser closer);
/**
* @return throttle to limit how fast a core attempts to become leader
*/
public abstract ActionThrottle getLeaderThrottle();
}

View File

@ -0,0 +1,63 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ActionThrottleTest extends SolrTestCaseJ4 {
protected static Logger log = LoggerFactory.getLogger(ActionThrottleTest.class);
@Test
public void testBasics() throws Exception {
ActionThrottle at = new ActionThrottle("test", 1000);
long start = System.nanoTime();
at.minimumWaitBetweenActions();
// should be no wait
assertTrue(TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS) < 1000);
at.markAttemptingAction();
if (random().nextBoolean()) Thread.sleep(100);
at.minimumWaitBetweenActions();
long elaspsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
assertTrue(elaspsedTime + "ms", elaspsedTime >= 995);
start = System.nanoTime();
at.markAttemptingAction();
at.minimumWaitBetweenActions();
Thread.sleep(random().nextInt(1000));
elaspsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
assertTrue(elaspsedTime + "ms", elaspsedTime >= 995);
}
}