SOLR-11458: Improve error handling in MoveReplicaCmd to avoid potential loss of data.

This commit is contained in:
Andrzej Bialecki 2017-12-05 14:00:56 +01:00
parent 9268b2b30f
commit 41644bdcdc
17 changed files with 221 additions and 68 deletions

View File

@ -168,6 +168,8 @@ Bug Fixes
* SOLR-9137: bin/solr script ignored custom STOP_PORT on shutdown.
(Joachim Kohlhammer, Steve Rowe, Christine Poerschke)
* SOLR-11458: Improve error handling in MoveReplicaCmd to avoid potential loss of data. (ab)
Optimizations
----------------------
* SOLR-11285: Refactor autoscaling framework to avoid direct references to Zookeeper and Solr

View File

@ -53,6 +53,10 @@ public class ActionThrottle {
this.timeSource = TimeSource.NANO_TIME;
}
public void reset() {
lastActionStartedAt = null;
}
public void markAttemptingAction() {
lastActionStartedAt = timeSource.getTime();
}

View File

@ -457,7 +457,8 @@ public class Assign {
if (createNodeList != null) { // Overrides petty considerations about maxShardsPerNode
if (createNodeList.size() != nodeNameVsShardCount.size()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"At least one of the node(s) specified are not currently active, no action taken.");
"At least one of the node(s) specified " + createNodeList + " are not currently active in "
+ nodeNameVsShardCount.keySet() + ", no action taken.");
}
return nodeNameVsShardCount;
}

View File

@ -31,6 +31,7 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
@ -45,6 +46,8 @@ import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE;
import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
public class MoveReplicaCmd implements Cmd{
@ -63,11 +66,12 @@ public class MoveReplicaCmd implements Cmd{
private void moveReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
log.debug("moveReplica() : {}", Utils.toJSONString(message));
ocmh.checkRequired(message, COLLECTION_PROP, "targetNode");
ocmh.checkRequired(message, COLLECTION_PROP, CollectionParams.TARGET_NODE);
String collection = message.getStr(COLLECTION_PROP);
String targetNode = message.getStr("targetNode");
String targetNode = message.getStr(CollectionParams.TARGET_NODE);
boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
boolean inPlaceMove = message.getBool(IN_PLACE_MOVE, true);
int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes
String async = message.getStr(ASYNC);
@ -75,6 +79,9 @@ public class MoveReplicaCmd implements Cmd{
if (coll == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
}
if (!clusterState.getLiveNodes().contains(targetNode)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target node: " + targetNode + " not in live nodes: " + clusterState.getLiveNodes());
}
Replica replica = null;
if (message.containsKey(REPLICA_PROP)) {
String replicaName = message.getStr(REPLICA_PROP);
@ -86,12 +93,17 @@ public class MoveReplicaCmd implements Cmd{
} else {
String sourceNode = message.getStr(CollectionParams.SOURCE_NODE, message.getStr(CollectionParams.FROM_NODE));
if (sourceNode == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "sourceNode is a required param" );
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + CollectionParams.SOURCE_NODE +
" or '" + CollectionParams.FROM_NODE + "' is a required param");
}
String shardId = message.getStr(SHARD_ID_PROP);
if (shardId == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + SHARD_ID_PROP + "' is a required param");
}
Slice slice = clusterState.getCollection(collection).getSlice(shardId);
List<Replica> sliceReplicas = new ArrayList<>(slice.getReplicas());
Collections.shuffle(sliceReplicas, RANDOM);
// this picks up a single random replica from the sourceNode
for (Replica r : slice.getReplicas()) {
if (r.getNodeName().equals(sourceNode)) {
replica = r;
@ -99,11 +111,11 @@ public class MoveReplicaCmd implements Cmd{
}
if (replica == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collection + " node: " + sourceNode + " do not have any replica belong to shard: " + shardId);
"Collection: " + collection + " node: " + sourceNode + " does not have any replica belonging to shard: " + shardId);
}
}
log.info("Replica will be moved {}", replica);
log.info("Replica will be moved to node {}: {}", targetNode, replica);
Slice slice = null;
for (Slice s : coll.getSlices()) {
if (s.getReplicas().contains(replica)) {
@ -112,9 +124,13 @@ public class MoveReplicaCmd implements Cmd{
}
assert slice != null;
Object dataDir = replica.get("dataDir");
if (dataDir != null && dataDir.toString().startsWith("hdfs:/")) {
boolean isSharedFS = replica.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && dataDir != null;
if (isSharedFS && inPlaceMove) {
log.debug("-- moveHdfsReplica");
moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout, waitForFinalState);
} else {
log.debug("-- moveNormalReplica (inPlaceMove=" + inPlaceMove + ", isSharedFS=" + isSharedFS);
moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout, waitForFinalState);
}
}
@ -135,10 +151,10 @@ public class MoveReplicaCmd implements Cmd{
NamedList deleteResult = new NamedList();
ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
if (deleteResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s",
coll.getName(), slice.getName(), replica.getName());
String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s, failure=%s",
coll.getName(), slice.getName(), replica.getName(), deleteResult.get("failure"));
log.warn(errorString);
results.add("failure", errorString + ", because of : " + deleteResult.get("failure"));
results.add("failure", errorString);
return;
}
@ -165,17 +181,48 @@ public class MoveReplicaCmd implements Cmd{
CoreAdminParams.NODE, targetNode,
CoreAdminParams.CORE_NODE_NAME, replica.getName(),
CoreAdminParams.NAME, replica.getCoreName(),
WAIT_FOR_FINAL_STATE, String.valueOf(waitForFinalState),
SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, skipCreateReplicaInClusterState,
CoreAdminParams.ULOG_DIR, ulogDir.substring(0, ulogDir.lastIndexOf(UpdateLog.TLOG_NAME)),
CoreAdminParams.DATA_DIR, dataDir);
if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
NamedList addResult = new NamedList();
ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, addResult, null);
try {
ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, addResult, null);
} catch (Exception e) {
// fatal error - try rolling back
String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
" on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
results.add("failure", errorString);
log.warn("Error adding replica " + addReplicasProps + " - trying to roll back...", e);
addReplicasProps = addReplicasProps.plus(CoreAdminParams.NODE, replica.getNodeName());
NamedList rollback = new NamedList();
ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, rollback, null);
if (rollback.get("failure") != null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
+ ", collection may be inconsistent: " + rollback.get("failure"));
}
return;
}
if (addResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
" on node=%s", coll.getName(), slice.getName(), targetNode);
" on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
log.warn(errorString);
results.add("failure", errorString);
log.debug("--- trying to roll back...");
// try to roll back
addReplicasProps = addReplicasProps.plus(CoreAdminParams.NODE, replica.getNodeName());
NamedList rollback = new NamedList();
try {
ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, rollback, null);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
+ ", collection may be inconsistent!", e);
}
if (rollback.get("failure") != null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
+ ", collection may be inconsistent! Failure: " + rollback.get("failure"));
}
return;
} else {
String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " +
@ -192,20 +239,20 @@ public class MoveReplicaCmd implements Cmd{
SHARD_ID_PROP, slice.getName(),
CoreAdminParams.NODE, targetNode,
CoreAdminParams.NAME, newCoreName);
if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
if (async != null) addReplicasProps.getProperties().put(ASYNC, async);
NamedList addResult = new NamedList();
SolrCloseableLatch countDownLatch = new SolrCloseableLatch(1, ocmh);
ActiveReplicaWatcher watcher = null;
ZkNodeProps props = ocmh.addReplica(clusterState, addReplicasProps, addResult, null);
log.info("props " + props);
log.debug("props " + props);
if (replica.equals(slice.getLeader()) || waitForFinalState) {
watcher = new ActiveReplicaWatcher(coll.getName(), null, Collections.singletonList(newCoreName), countDownLatch);
log.info("-- registered watcher " + watcher);
log.debug("-- registered watcher " + watcher);
ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher);
}
if (addResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
" on node=%s", coll.getName(), slice.getName(), targetNode);
" on node=%s, failure=", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
log.warn(errorString);
results.add("failure", errorString);
if (watcher != null) { // unregister
@ -239,10 +286,10 @@ public class MoveReplicaCmd implements Cmd{
NamedList deleteResult = new NamedList();
ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
if (deleteResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s",
coll.getName(), slice.getName(), replica.getName());
String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s, failure=%s",
coll.getName(), slice.getName(), replica.getName(), deleteResult.get("failure"));
log.warn(errorString);
results.add("failure", errorString + ", because of : " + deleteResult.get("failure"));
results.add("failure", errorString);
} else {
String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " +
"to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), newCoreName, targetNode);

View File

@ -698,7 +698,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
if (result.size() == coreNames.size()) {
return result;
} else {
log.debug("Expecting {} cores but found {}", coreNames.size(), result.size());
log.debug("Expecting {} cores but found {}", coreNames, result);
}
if (timeout.hasTimedOut()) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + coll);

View File

@ -1417,6 +1417,7 @@ public class ZkController {
}
if (core != null && core.getDirectoryFactory().isSharedStorage()) {
if (core.getDirectoryFactory().isSharedStorage()) {
props.put(ZkStateReader.SHARED_STORAGE_PROP, "true");
props.put("dataDir", core.getDataDir());
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (ulog != null) {

View File

@ -118,7 +118,7 @@ public class ScheduledTriggers implements Closeable {
private final AtomicReference<ActionThrottle> actionThrottle;
private final SolrCloudManager dataProvider;
private final SolrCloudManager cloudManager;
private final DistribStateManager stateManager;
@ -130,15 +130,15 @@ public class ScheduledTriggers implements Closeable {
private AutoScalingConfig autoScalingConfig;
public ScheduledTriggers(SolrResourceLoader loader, SolrCloudManager dataProvider) {
public ScheduledTriggers(SolrResourceLoader loader, SolrCloudManager cloudManager) {
scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(DEFAULT_TRIGGER_CORE_POOL_SIZE,
new DefaultSolrThreadFactory("ScheduledTrigger"));
scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
actionThrottle = new AtomicReference<>(new ActionThrottle("action", TimeUnit.SECONDS.toMillis(DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS)));
this.dataProvider = dataProvider;
this.stateManager = dataProvider.getDistribStateManager();
this.cloudManager = cloudManager;
this.stateManager = cloudManager.getDistribStateManager();
this.loader = loader;
queueStats = new Stats();
listeners = new TriggerListeners();
@ -198,6 +198,11 @@ public class ScheduledTriggers implements Closeable {
}
}
this.autoScalingConfig = autoScalingConfig;
// reset cooldown and actionThrottle
cooldownStart.set(System.nanoTime() - cooldownPeriod.get());
actionThrottle.get().reset();
listeners.setAutoScalingConfig(autoScalingConfig);
}
@ -215,12 +220,12 @@ public class ScheduledTriggers implements Closeable {
}
ScheduledTrigger st;
try {
st = new ScheduledTrigger(newTrigger, dataProvider, queueStats);
st = new ScheduledTrigger(newTrigger, cloudManager, queueStats);
} catch (Exception e) {
if (isClosed) {
throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore");
}
if (dataProvider.isClosed()) {
if (cloudManager.isClosed()) {
log.error("Failed to add trigger " + newTrigger.getName() + " - closing or disconnected from data provider", e);
} else {
log.error("Failed to add trigger " + newTrigger.getName(), e);
@ -241,7 +246,7 @@ public class ScheduledTriggers implements Closeable {
scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger);
}
newTrigger.setProcessor(event -> {
if (dataProvider.isClosed()) {
if (cloudManager.isClosed()) {
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because Solr has been shutdown.", event.toString());
log.warn(msg);
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
@ -296,7 +301,7 @@ public class ScheduledTriggers implements Closeable {
// this event so that we continue processing other events and not block this action executor
waitForPendingTasks(newTrigger, actions);
ActionContext actionContext = new ActionContext(dataProvider, newTrigger, new HashMap<>());
ActionContext actionContext = new ActionContext(cloudManager, newTrigger, new HashMap<>());
for (TriggerAction action : actions) {
List<String> beforeActions = (List<String>) actionContext.getProperties().computeIfAbsent(TriggerEventProcessorStage.BEFORE_ACTION.toString(), k -> new ArrayList<String>());
beforeActions.add(action.getName());
@ -346,7 +351,7 @@ public class ScheduledTriggers implements Closeable {
}
private void waitForPendingTasks(AutoScaling.Trigger newTrigger, List<TriggerAction> actions) throws AlreadyClosedException {
DistribStateManager stateManager = dataProvider.getDistribStateManager();
DistribStateManager stateManager = cloudManager.getDistribStateManager();
try {
for (TriggerAction action : actions) {
@ -365,7 +370,7 @@ public class ScheduledTriggers implements Closeable {
String requestid = (String) map.get("requestid");
try {
log.debug("Found pending task with requestid={}", requestid);
RequestStatusResponse statusResponse = waitForTaskToFinish(dataProvider, requestid,
RequestStatusResponse statusResponse = waitForTaskToFinish(cloudManager, requestid,
ExecutePlanAction.DEFAULT_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (statusResponse != null) {
RequestStatusState state = statusResponse.getRequestStatus();
@ -374,7 +379,7 @@ public class ScheduledTriggers implements Closeable {
}
}
} catch (Exception e) {
if (dataProvider.isClosed()) {
if (cloudManager.isClosed()) {
throw e; // propagate the abort to the caller
}
Throwable rootCause = ExceptionUtils.getRootCause(e);
@ -395,7 +400,7 @@ public class ScheduledTriggers implements Closeable {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted", e);
} catch (Exception e) {
if (dataProvider.isClosed()) {
if (cloudManager.isClosed()) {
throw new AlreadyClosedException("The Solr instance has been shutdown");
}
// we catch but don't rethrow because a failure to wait for pending tasks
@ -618,7 +623,7 @@ public class ScheduledTriggers implements Closeable {
}
if (listener != null) {
try {
listener.init(dataProvider, config);
listener.init(cloudManager, config);
listenersPerName.put(config.name, listener);
} catch (Exception e) {
log.warn("Error initializing TriggerListener " + config, e);

View File

@ -18,6 +18,7 @@ package org.apache.solr.core;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URLEncoder;
@ -551,18 +552,21 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory implements Sol
return accept;
}
});
} catch (FileNotFoundException fnfe) {
// already deleted - ignore
LOG.debug("Old index directory already deleted - skipping...", fnfe);
} catch (IOException ioExc) {
LOG.error("Error checking for old index directories to clean-up.", ioExc);
}
if (oldIndexDirs == null || oldIndexDirs.length == 0)
return; // nothing to clean-up
List<Path> oldIndexPaths = new ArrayList<>(oldIndexDirs.length);
for (FileStatus ofs : oldIndexDirs) {
oldIndexPaths.add(ofs.getPath());
}
if (oldIndexDirs == null || oldIndexDirs.length == 0)
return; // nothing to clean-up
Collections.sort(oldIndexPaths, Collections.reverseOrder());
Set<String> livePaths = getLivePaths();

View File

@ -131,6 +131,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.params.CommonParams.VALUE_LONG;
@ -921,6 +922,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
CollectionParams.SOURCE_NODE,
CollectionParams.TARGET_NODE,
WAIT_FOR_FINAL_STATE,
IN_PLACE_MOVE,
"replica",
"shard");
}),

View File

@ -195,8 +195,8 @@ public class CollectionTooManyReplicasTest extends SolrCloudTestCase {
.setNode(deadNode)
.process(cluster.getSolrClient());
});
assertTrue("Should have gotten a message about shard not ",
e1.getMessage().contains("At least one of the node(s) specified are not currently active, no action taken."));
assertTrue("Should have gotten a message about shard not currently active: " + e1.toString(),
e1.toString().contains("At least one of the node(s) specified [" + deadNode + "] are not currently active in"));
// Should also die if we just add a shard
Exception e2 = expectThrows(Exception.class, () -> {
@ -205,8 +205,8 @@ public class CollectionTooManyReplicasTest extends SolrCloudTestCase {
.process(cluster.getSolrClient());
});
assertTrue("Should have gotten a message about shard not ",
e2.getMessage().contains("At least one of the node(s) specified are not currently active, no action taken."));
assertTrue("Should have gotten a message about shard not currently active: " + e2.toString(),
e2.toString().contains("At least one of the node(s) specified [" + deadNode + "] are not currently active in"));
}
finally {
cluster.startJettySolrRunner(jetty);

View File

@ -25,6 +25,7 @@ import org.apache.solr.util.BadHdfsThreadsFilter;
import org.apache.solr.util.LogLevel;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
*
@ -56,6 +57,18 @@ public class MoveReplicaHDFSTest extends MoveReplicaTest {
dfsCluster = null;
}
@Test
public void testNormalMove() throws Exception {
inPlaceMove = false;
test();
}
@Test
public void testNormalFailedMove() throws Exception {
inPlaceMove = false;
testFailedMove();
}
public static class ForkJoinThreadsFilter implements ThreadFilter {
@Override

View File

@ -48,6 +48,7 @@ import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.util.IdUtils;
import org.apache.solr.util.LogLevel;
import org.junit.Before;
import org.junit.BeforeClass;
@ -62,6 +63,9 @@ public class MoveReplicaTest extends SolrCloudTestCase {
private static ZkStateReaderAccessor accessor;
private static int overseerLeaderIndex;
// used by MoveReplicaHDFSTest
protected boolean inPlaceMove = true;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(4)
@ -91,12 +95,17 @@ public class MoveReplicaTest extends SolrCloudTestCase {
@Before
public void beforeTest() throws Exception {
cluster.deleteAllCollections();
// restart any shut down nodes
for (int i = cluster.getJettySolrRunners().size(); i < 5; i++) {
cluster.startJettySolrRunner();
}
cluster.waitForAllNodes(5000);
inPlaceMove = true;
}
@Test
public void test() throws Exception {
cluster.waitForAllNodes(5000);
String coll = "movereplicatest_coll";
String coll = getTestClass().getSimpleName() + "_coll_" + inPlaceMove;
log.info("total_jettys: " + cluster.getJettySolrRunners().size());
int REPLICATION = 2;
@ -104,6 +113,7 @@ public class MoveReplicaTest extends SolrCloudTestCase {
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION);
create.setMaxShardsPerNode(2);
create.setAutoAddReplicas(false);
cloudClient.request(create);
addDocs(coll, 100);
@ -133,8 +143,10 @@ public class MoveReplicaTest extends SolrCloudTestCase {
int targetNumCores = getNumOfCores(cloudClient, targetNode, coll);
CollectionAdminRequest.MoveReplica moveReplica = createMoveReplicaRequest(coll, replica, targetNode);
moveReplica.processAsync("000", cloudClient);
CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000");
moveReplica.setInPlaceMove(inPlaceMove);
String asyncId = IdUtils.randomId();
moveReplica.processAsync(asyncId, cloudClient);
CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus(asyncId);
// wait for async request success
boolean success = false;
for (int i = 0; i < 200; i++) {
@ -193,6 +205,7 @@ public class MoveReplicaTest extends SolrCloudTestCase {
assertEquals(watchers, newWatchers);
moveReplica = createMoveReplicaRequest(coll, replica, targetNode, shardId);
moveReplica.setInPlaceMove(inPlaceMove);
moveReplica.process(cloudClient);
checkNumOfCores(cloudClient, replica.getNodeName(), coll, sourceNumCores);
// wait for recovery
@ -232,12 +245,14 @@ public class MoveReplicaTest extends SolrCloudTestCase {
assertTrue("replica never fully recovered", recovered);
newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
assertEquals(watchers, newWatchers);
assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
}
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-11458")
// @AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-11458")
@Test
public void testFailedMove() throws Exception {
String coll = "movereplicatest_failed_coll";
String coll = getTestClass().getSimpleName() + "_failed_coll_" + inPlaceMove;
int REPLICATION = 2;
CloudSolrClient cloudClient = cluster.getSolrClient();
@ -245,6 +260,7 @@ public class MoveReplicaTest extends SolrCloudTestCase {
Set<CollectionStateWatcher> watchers = new HashSet<>(accessor.getStateWatchers(coll));
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION);
create.setAutoAddReplicas(false);
cloudClient.request(create);
addDocs(coll, 100);
@ -262,15 +278,17 @@ public class MoveReplicaTest extends SolrCloudTestCase {
}
assertNotNull(targetNode);
CollectionAdminRequest.MoveReplica moveReplica = createMoveReplicaRequest(coll, replica, targetNode);
moveReplica.setInPlaceMove(inPlaceMove);
// start moving
moveReplica.processAsync("001", cloudClient);
String asyncId = IdUtils.randomId();
moveReplica.processAsync(asyncId, cloudClient);
// shut down target node
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
if (cluster.getJettySolrRunner(i).getNodeName().equals(targetNode)) {
cluster.stopJettySolrRunner(i);
}
}
CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("001");
CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus(asyncId);
// wait for async request success
boolean success = true;
for (int i = 0; i < 200; i++) {
@ -286,6 +304,7 @@ public class MoveReplicaTest extends SolrCloudTestCase {
Set<CollectionStateWatcher> newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
assertEquals(watchers, newWatchers);
log.info("--- current collection state: " + cloudClient.getZkStateReader().getClusterState().getCollection(coll));
assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
}
@ -375,4 +394,5 @@ public class MoveReplicaTest extends SolrCloudTestCase {
}
solrClient.commit(collection);
Thread.sleep(5000);
}}
}
}

View File

@ -119,6 +119,10 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Before
public void setupTest() throws Exception {
// clear any persisted auto scaling configuration
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
throttlingDelayMs.set(TimeUnit.SECONDS.toMillis(ScheduledTriggers.DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS));
waitForSeconds = 1 + random().nextInt(3);
actionConstructorCalled = new CountDownLatch(1);
@ -130,9 +134,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
actionCompleted = new CountDownLatch(1);
events.clear();
listenerEvents.clear();
// clear any persisted auto scaling configuration
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
lastActionExecutedAt.set(0);
// clear any events or markers
// todo: consider the impact of such cleanup on regular cluster restarts
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
@ -201,7 +203,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
JettySolrRunner newNode = cluster.startJettySolrRunner();
if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
if (!triggerFiredLatch.await(30, TimeUnit.SECONDS)) {
fail("Both triggers should have fired by now");
}
@ -249,7 +251,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
}
if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
if (!triggerFiredLatch.await(30, TimeUnit.SECONDS)) {
fail("Both triggers should have fired by now");
}
}
@ -272,16 +274,20 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
return;
}
try {
long currentTime = timeSource.getTime();
if (lastActionExecutedAt.get() != 0) {
log.info("last action at " + lastActionExecutedAt.get() + " time = " + timeSource.getTime() + " expected diff: " + TimeUnit.MILLISECONDS.toNanos(throttlingDelayMs.get() - DELTA_MS));
if (timeSource.getTime() - lastActionExecutedAt.get() < TimeUnit.MILLISECONDS.toNanos(throttlingDelayMs.get() - DELTA_MS)) {
long minDiff = TimeUnit.MILLISECONDS.toNanos(throttlingDelayMs.get() - DELTA_MS);
log.info("last action at " + lastActionExecutedAt.get() + " current time = " + currentTime +
"\nreal diff: " + (currentTime - lastActionExecutedAt.get()) +
"\n min diff: " + minDiff);
if (currentTime - lastActionExecutedAt.get() < minDiff) {
log.info("action executed again before minimum wait time from {}", event.getSource());
fail("TriggerListener was fired before the throttling period");
}
}
if (onlyOnce.compareAndSet(false, true)) {
log.info("action executed from {}", event.getSource());
lastActionExecutedAt.set(timeSource.getTime());
lastActionExecutedAt.set(currentTime);
getTriggerFiredLatch().countDown();
} else {
log.info("action executed more than once from {}", event.getSource());
@ -1225,6 +1231,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS));
prevTimestamp = ev.timestamp;
// this also resets the cooldown period
long modifiedCooldownPeriodSeconds = 7;
String setPropertiesCommand = "{\n" +
"\t\"set-properties\" : {\n" +
@ -1243,13 +1250,24 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
JettySolrRunner newNode3 = cluster.startJettySolrRunner();
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
triggerFiredLatch = new CountDownLatch(1);
triggerFired.compareAndSet(true, false);
// add another node
JettySolrRunner newNode4 = cluster.startJettySolrRunner();
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
// wait for listener to capture the SUCCEEDED stage
Thread.sleep(2000);
// there must be at least one IGNORED event due to cooldown, and one SUCCEEDED event
// there must be at least one SUCCEEDED (due to newNode3) then for newNode4 one IGNORED
// event due to cooldown, and one SUCCEEDED
capturedEvents = listenerEvents.get("bar");
assertTrue(capturedEvents.toString(), capturedEvents.size() > 1);
for (int i = 0; i < capturedEvents.size() - 1; i++) {
assertTrue(capturedEvents.toString(), capturedEvents.size() > 2);
// first event should be SUCCEEDED
ev = capturedEvents.get(0);
assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
for (int i = 1; i < capturedEvents.size() - 1; i++) {
ev = capturedEvents.get(i);
assertEquals(ev.toString(), TriggerEventProcessorStage.IGNORED, ev.stage);
assertTrue(ev.toString(), ev.message.contains("cooldown"));

View File

@ -617,6 +617,8 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
protected String collection, replica, targetNode;
protected String shard, sourceNode;
protected boolean randomlyMoveReplica;
protected boolean inPlaceMove = true;
protected int timeout = -1;
public MoveReplica(String collection, String replica, String targetNode) {
super(CollectionAction.MOVEREPLICA);
@ -635,11 +637,23 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
this.randomlyMoveReplica = true;
}
public void setInPlaceMove(boolean inPlaceMove) {
this.inPlaceMove = inPlaceMove;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
@Override
public SolrParams getParams() {
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
params.set("collection", collection);
params.set(CollectionParams.TARGET_NODE, targetNode);
params.set(CommonAdminParams.IN_PLACE_MOVE, inPlaceMove);
if (timeout != -1) {
params.set(CommonAdminParams.TIMEOUT, timeout);
}
if (randomlyMoveReplica) {
params.set("shard", shard);
params.set(CollectionParams.SOURCE_NODE, sourceNode);

View File

@ -89,6 +89,7 @@ public class ZkStateReader implements Closeable {
public static final String SHARD_PARENT_PROP = "shard_parent";
public static final String NUM_SHARDS_PROP = "numShards";
public static final String LEADER_PROP = "leader";
public static final String SHARED_STORAGE_PROP = "shared_storage";
public static final String PROPERTY_PROP = "property";
public static final String PROPERTY_PROP_PREFIX = "property.";
public static final String PROPERTY_VALUE_PROP = "property.value";

View File

@ -19,7 +19,12 @@ package org.apache.solr.common.params;
public interface CommonAdminParams
{
/** async or not? **/
/** Async or not? **/
String ASYNC = "async";
/** Wait for final state of the operation. */
String WAIT_FOR_FINAL_STATE = "waitForFinalState";
/** Allow in-place move of replicas that use shared filesystems. */
String IN_PLACE_MOVE = "inPlaceMove";
/** Timeout for replicas to become active. */
String TIMEOUT = "timeout";
}

View File

@ -20,7 +20,7 @@
"move-replica": {
"type": "object",
"documentation": "https://lucene.apache.org/solr/guide/collections-api.html#movereplica",
"description": "This command moves a replica from one node to a new node. In case of shared filesystems the `dataDir` will be reused.",
"description": "This command moves a replica from one node to a new node. In case of shared filesystems the `dataDir` and `ulogDir` may be reused.",
"properties": {
"replica": {
"type": "string",
@ -32,13 +32,29 @@
},
"sourceNode": {
"type": "string",
"description": "The name of the node that contains the replica"
"description": "The name of the node that contains the replica."
},
"targetNode": {
"type": "string",
"description": "The name of the destination node. This parameter is required"
"description": "The name of the destination node. This parameter is required."
},
"waitForFinalState": {
"type": "boolean",
"default": "false",
"description": "Wait for the moved replica to become active."
},
"timeout": {
"type": "integer",
"default": 600,
"description": "Timeout to wait for replica to become active. For very large replicas this may need to be increased."
},
"inPlaceMove": {
"type": "boolean",
"default": "true",
"description": "For replicas that use shared filesystems allow 'in-place' move that reuses shared data."
}
}
},
"required":["targetNode"]
},
"migrate-docs":{
"type":"object",