From 41644bdcdcc0734115ce08ec24d6b408e1f8cf28 Mon Sep 17 00:00:00 2001 From: Andrzej Bialecki Date: Tue, 5 Dec 2017 14:00:56 +0100 Subject: [PATCH] SOLR-11458: Improve error handling in MoveReplicaCmd to avoid potential loss of data. --- solr/CHANGES.txt | 2 + .../org/apache/solr/cloud/ActionThrottle.java | 4 + .../java/org/apache/solr/cloud/Assign.java | 3 +- .../org/apache/solr/cloud/MoveReplicaCmd.java | 85 ++++++++++++++----- .../OverseerCollectionMessageHandler.java | 2 +- .../org/apache/solr/cloud/ZkController.java | 1 + .../cloud/autoscaling/ScheduledTriggers.java | 31 ++++--- .../solr/core/HdfsDirectoryFactory.java | 14 +-- .../handler/admin/CollectionsHandler.java | 2 + .../cloud/CollectionTooManyReplicasTest.java | 8 +- .../solr/cloud/MoveReplicaHDFSTest.java | 13 +++ .../apache/solr/cloud/MoveReplicaTest.java | 38 +++++++-- .../autoscaling/TriggerIntegrationTest.java | 40 ++++++--- .../solrj/request/CollectionAdminRequest.java | 14 +++ .../solr/common/cloud/ZkStateReader.java | 1 + .../solr/common/params/CommonAdminParams.java | 7 +- .../collections.collection.Commands.json | 24 +++++- 17 files changed, 221 insertions(+), 68 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 456363f26e2..5a8c2b234fd 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -168,6 +168,8 @@ Bug Fixes * SOLR-9137: bin/solr script ignored custom STOP_PORT on shutdown. (Joachim Kohlhammer, Steve Rowe, Christine Poerschke) +* SOLR-11458: Improve error handling in MoveReplicaCmd to avoid potential loss of data. (ab) + Optimizations ---------------------- * SOLR-11285: Refactor autoscaling framework to avoid direct references to Zookeeper and Solr diff --git a/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java b/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java index 9476c3c9d74..f60332c0587 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java +++ b/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java @@ -52,6 +52,10 @@ public class ActionThrottle { this.lastActionStartedAt = lastActionStartedAt; this.timeSource = TimeSource.NANO_TIME; } + + public void reset() { + lastActionStartedAt = null; + } public void markAttemptingAction() { lastActionStartedAt = timeSource.getTime(); diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java index 1b488811ef6..9967b230592 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Assign.java +++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java @@ -457,7 +457,8 @@ public class Assign { if (createNodeList != null) { // Overrides petty considerations about maxShardsPerNode if (createNodeList.size() != nodeNameVsShardCount.size()) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, - "At least one of the node(s) specified are not currently active, no action taken."); + "At least one of the node(s) specified " + createNodeList + " are not currently active in " + + nodeNameVsShardCount.keySet() + ", no action taken."); } return nodeNameVsShardCount; } diff --git a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java index a2ed4074316..71d5c824d7d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java @@ -31,6 +31,7 @@ import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.util.NamedList; @@ -45,6 +46,8 @@ import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; +import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE; +import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT; import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE; public class MoveReplicaCmd implements Cmd{ @@ -63,11 +66,12 @@ public class MoveReplicaCmd implements Cmd{ private void moveReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { log.debug("moveReplica() : {}", Utils.toJSONString(message)); - ocmh.checkRequired(message, COLLECTION_PROP, "targetNode"); + ocmh.checkRequired(message, COLLECTION_PROP, CollectionParams.TARGET_NODE); String collection = message.getStr(COLLECTION_PROP); - String targetNode = message.getStr("targetNode"); + String targetNode = message.getStr(CollectionParams.TARGET_NODE); boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false); - int timeout = message.getInt("timeout", 10 * 60); // 10 minutes + boolean inPlaceMove = message.getBool(IN_PLACE_MOVE, true); + int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes String async = message.getStr(ASYNC); @@ -75,6 +79,9 @@ public class MoveReplicaCmd implements Cmd{ if (coll == null) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist"); } + if (!clusterState.getLiveNodes().contains(targetNode)) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target node: " + targetNode + " not in live nodes: " + clusterState.getLiveNodes()); + } Replica replica = null; if (message.containsKey(REPLICA_PROP)) { String replicaName = message.getStr(REPLICA_PROP); @@ -86,12 +93,17 @@ public class MoveReplicaCmd implements Cmd{ } else { String sourceNode = message.getStr(CollectionParams.SOURCE_NODE, message.getStr(CollectionParams.FROM_NODE)); if (sourceNode == null) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "sourceNode is a required param" ); + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + CollectionParams.SOURCE_NODE + + " or '" + CollectionParams.FROM_NODE + "' is a required param"); } String shardId = message.getStr(SHARD_ID_PROP); + if (shardId == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + SHARD_ID_PROP + "' is a required param"); + } Slice slice = clusterState.getCollection(collection).getSlice(shardId); List sliceReplicas = new ArrayList<>(slice.getReplicas()); Collections.shuffle(sliceReplicas, RANDOM); + // this picks up a single random replica from the sourceNode for (Replica r : slice.getReplicas()) { if (r.getNodeName().equals(sourceNode)) { replica = r; @@ -99,11 +111,11 @@ public class MoveReplicaCmd implements Cmd{ } if (replica == null) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, - "Collection: " + collection + " node: " + sourceNode + " do not have any replica belong to shard: " + shardId); + "Collection: " + collection + " node: " + sourceNode + " does not have any replica belonging to shard: " + shardId); } } - log.info("Replica will be moved {}", replica); + log.info("Replica will be moved to node {}: {}", targetNode, replica); Slice slice = null; for (Slice s : coll.getSlices()) { if (s.getReplicas().contains(replica)) { @@ -112,9 +124,13 @@ public class MoveReplicaCmd implements Cmd{ } assert slice != null; Object dataDir = replica.get("dataDir"); - if (dataDir != null && dataDir.toString().startsWith("hdfs:/")) { + boolean isSharedFS = replica.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && dataDir != null; + + if (isSharedFS && inPlaceMove) { + log.debug("-- moveHdfsReplica"); moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout, waitForFinalState); } else { + log.debug("-- moveNormalReplica (inPlaceMove=" + inPlaceMove + ", isSharedFS=" + isSharedFS); moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout, waitForFinalState); } } @@ -135,10 +151,10 @@ public class MoveReplicaCmd implements Cmd{ NamedList deleteResult = new NamedList(); ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null); if (deleteResult.get("failure") != null) { - String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s", - coll.getName(), slice.getName(), replica.getName()); + String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s, failure=%s", + coll.getName(), slice.getName(), replica.getName(), deleteResult.get("failure")); log.warn(errorString); - results.add("failure", errorString + ", because of : " + deleteResult.get("failure")); + results.add("failure", errorString); return; } @@ -165,17 +181,48 @@ public class MoveReplicaCmd implements Cmd{ CoreAdminParams.NODE, targetNode, CoreAdminParams.CORE_NODE_NAME, replica.getName(), CoreAdminParams.NAME, replica.getCoreName(), + WAIT_FOR_FINAL_STATE, String.valueOf(waitForFinalState), SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, skipCreateReplicaInClusterState, CoreAdminParams.ULOG_DIR, ulogDir.substring(0, ulogDir.lastIndexOf(UpdateLog.TLOG_NAME)), CoreAdminParams.DATA_DIR, dataDir); if(async!=null) addReplicasProps.getProperties().put(ASYNC, async); NamedList addResult = new NamedList(); - ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, addResult, null); + try { + ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, addResult, null); + } catch (Exception e) { + // fatal error - try rolling back + String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" + + " on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure")); + results.add("failure", errorString); + log.warn("Error adding replica " + addReplicasProps + " - trying to roll back...", e); + addReplicasProps = addReplicasProps.plus(CoreAdminParams.NODE, replica.getNodeName()); + NamedList rollback = new NamedList(); + ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, rollback, null); + if (rollback.get("failure") != null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica + + ", collection may be inconsistent: " + rollback.get("failure")); + } + return; + } if (addResult.get("failure") != null) { String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" + - " on node=%s", coll.getName(), slice.getName(), targetNode); + " on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure")); log.warn(errorString); results.add("failure", errorString); + log.debug("--- trying to roll back..."); + // try to roll back + addReplicasProps = addReplicasProps.plus(CoreAdminParams.NODE, replica.getNodeName()); + NamedList rollback = new NamedList(); + try { + ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, rollback, null); + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica + + ", collection may be inconsistent!", e); + } + if (rollback.get("failure") != null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica + + ", collection may be inconsistent! Failure: " + rollback.get("failure")); + } return; } else { String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " + @@ -192,20 +239,20 @@ public class MoveReplicaCmd implements Cmd{ SHARD_ID_PROP, slice.getName(), CoreAdminParams.NODE, targetNode, CoreAdminParams.NAME, newCoreName); - if(async!=null) addReplicasProps.getProperties().put(ASYNC, async); + if (async != null) addReplicasProps.getProperties().put(ASYNC, async); NamedList addResult = new NamedList(); SolrCloseableLatch countDownLatch = new SolrCloseableLatch(1, ocmh); ActiveReplicaWatcher watcher = null; ZkNodeProps props = ocmh.addReplica(clusterState, addReplicasProps, addResult, null); - log.info("props " + props); + log.debug("props " + props); if (replica.equals(slice.getLeader()) || waitForFinalState) { watcher = new ActiveReplicaWatcher(coll.getName(), null, Collections.singletonList(newCoreName), countDownLatch); - log.info("-- registered watcher " + watcher); + log.debug("-- registered watcher " + watcher); ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher); } if (addResult.get("failure") != null) { String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" + - " on node=%s", coll.getName(), slice.getName(), targetNode); + " on node=%s, failure=", coll.getName(), slice.getName(), targetNode, addResult.get("failure")); log.warn(errorString); results.add("failure", errorString); if (watcher != null) { // unregister @@ -239,10 +286,10 @@ public class MoveReplicaCmd implements Cmd{ NamedList deleteResult = new NamedList(); ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null); if (deleteResult.get("failure") != null) { - String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s", - coll.getName(), slice.getName(), replica.getName()); + String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s, failure=%s", + coll.getName(), slice.getName(), replica.getName(), deleteResult.get("failure")); log.warn(errorString); - results.add("failure", errorString + ", because of : " + deleteResult.get("failure")); + results.add("failure", errorString); } else { String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " + "to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), newCoreName, targetNode); diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java index 8db306fc7a7..26f17098bfd 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java @@ -698,7 +698,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, if (result.size() == coreNames.size()) { return result; } else { - log.debug("Expecting {} cores but found {}", coreNames.size(), result.size()); + log.debug("Expecting {} cores but found {}", coreNames, result); } if (timeout.hasTimedOut()) { throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + coll); diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index cc4a5905b2c..365da65f9cb 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1417,6 +1417,7 @@ public class ZkController { } if (core != null && core.getDirectoryFactory().isSharedStorage()) { if (core.getDirectoryFactory().isSharedStorage()) { + props.put(ZkStateReader.SHARED_STORAGE_PROP, "true"); props.put("dataDir", core.getDataDir()); UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); if (ulog != null) { diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java index a57faa4826c..82789770b8d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java @@ -118,7 +118,7 @@ public class ScheduledTriggers implements Closeable { private final AtomicReference actionThrottle; - private final SolrCloudManager dataProvider; + private final SolrCloudManager cloudManager; private final DistribStateManager stateManager; @@ -130,15 +130,15 @@ public class ScheduledTriggers implements Closeable { private AutoScalingConfig autoScalingConfig; - public ScheduledTriggers(SolrResourceLoader loader, SolrCloudManager dataProvider) { + public ScheduledTriggers(SolrResourceLoader loader, SolrCloudManager cloudManager) { scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(DEFAULT_TRIGGER_CORE_POOL_SIZE, new DefaultSolrThreadFactory("ScheduledTrigger")); scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true); scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor")); actionThrottle = new AtomicReference<>(new ActionThrottle("action", TimeUnit.SECONDS.toMillis(DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS))); - this.dataProvider = dataProvider; - this.stateManager = dataProvider.getDistribStateManager(); + this.cloudManager = cloudManager; + this.stateManager = cloudManager.getDistribStateManager(); this.loader = loader; queueStats = new Stats(); listeners = new TriggerListeners(); @@ -198,6 +198,11 @@ public class ScheduledTriggers implements Closeable { } } this.autoScalingConfig = autoScalingConfig; + + // reset cooldown and actionThrottle + cooldownStart.set(System.nanoTime() - cooldownPeriod.get()); + actionThrottle.get().reset(); + listeners.setAutoScalingConfig(autoScalingConfig); } @@ -215,12 +220,12 @@ public class ScheduledTriggers implements Closeable { } ScheduledTrigger st; try { - st = new ScheduledTrigger(newTrigger, dataProvider, queueStats); + st = new ScheduledTrigger(newTrigger, cloudManager, queueStats); } catch (Exception e) { if (isClosed) { throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore"); } - if (dataProvider.isClosed()) { + if (cloudManager.isClosed()) { log.error("Failed to add trigger " + newTrigger.getName() + " - closing or disconnected from data provider", e); } else { log.error("Failed to add trigger " + newTrigger.getName(), e); @@ -241,7 +246,7 @@ public class ScheduledTriggers implements Closeable { scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger); } newTrigger.setProcessor(event -> { - if (dataProvider.isClosed()) { + if (cloudManager.isClosed()) { String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because Solr has been shutdown.", event.toString()); log.warn(msg); listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg); @@ -296,7 +301,7 @@ public class ScheduledTriggers implements Closeable { // this event so that we continue processing other events and not block this action executor waitForPendingTasks(newTrigger, actions); - ActionContext actionContext = new ActionContext(dataProvider, newTrigger, new HashMap<>()); + ActionContext actionContext = new ActionContext(cloudManager, newTrigger, new HashMap<>()); for (TriggerAction action : actions) { List beforeActions = (List) actionContext.getProperties().computeIfAbsent(TriggerEventProcessorStage.BEFORE_ACTION.toString(), k -> new ArrayList()); beforeActions.add(action.getName()); @@ -346,7 +351,7 @@ public class ScheduledTriggers implements Closeable { } private void waitForPendingTasks(AutoScaling.Trigger newTrigger, List actions) throws AlreadyClosedException { - DistribStateManager stateManager = dataProvider.getDistribStateManager(); + DistribStateManager stateManager = cloudManager.getDistribStateManager(); try { for (TriggerAction action : actions) { @@ -365,7 +370,7 @@ public class ScheduledTriggers implements Closeable { String requestid = (String) map.get("requestid"); try { log.debug("Found pending task with requestid={}", requestid); - RequestStatusResponse statusResponse = waitForTaskToFinish(dataProvider, requestid, + RequestStatusResponse statusResponse = waitForTaskToFinish(cloudManager, requestid, ExecutePlanAction.DEFAULT_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS); if (statusResponse != null) { RequestStatusState state = statusResponse.getRequestStatus(); @@ -374,7 +379,7 @@ public class ScheduledTriggers implements Closeable { } } } catch (Exception e) { - if (dataProvider.isClosed()) { + if (cloudManager.isClosed()) { throw e; // propagate the abort to the caller } Throwable rootCause = ExceptionUtils.getRootCause(e); @@ -395,7 +400,7 @@ public class ScheduledTriggers implements Closeable { Thread.currentThread().interrupt(); throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted", e); } catch (Exception e) { - if (dataProvider.isClosed()) { + if (cloudManager.isClosed()) { throw new AlreadyClosedException("The Solr instance has been shutdown"); } // we catch but don't rethrow because a failure to wait for pending tasks @@ -618,7 +623,7 @@ public class ScheduledTriggers implements Closeable { } if (listener != null) { try { - listener.init(dataProvider, config); + listener.init(cloudManager, config); listenersPerName.put(config.name, listener); } catch (Exception e) { log.warn("Error initializing TriggerListener " + config, e); diff --git a/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java index 260a991d1ce..f248152b4fc 100644 --- a/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java +++ b/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java @@ -18,6 +18,7 @@ package org.apache.solr.core; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; +import java.io.FileNotFoundException; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.net.URLEncoder; @@ -551,18 +552,21 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory implements Sol return accept; } }); + } catch (FileNotFoundException fnfe) { + // already deleted - ignore + LOG.debug("Old index directory already deleted - skipping...", fnfe); } catch (IOException ioExc) { LOG.error("Error checking for old index directories to clean-up.", ioExc); } - - List oldIndexPaths = new ArrayList<>(oldIndexDirs.length); - for (FileStatus ofs : oldIndexDirs) { - oldIndexPaths.add(ofs.getPath()); - } if (oldIndexDirs == null || oldIndexDirs.length == 0) return; // nothing to clean-up + List oldIndexPaths = new ArrayList<>(oldIndexDirs.length); + for (FileStatus ofs : oldIndexDirs) { + oldIndexPaths.add(ofs.getPath()); + } + Collections.sort(oldIndexPaths, Collections.reverseOrder()); Set livePaths = getLivePaths(); diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index 2204c05a544..de066d5bd68 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -131,6 +131,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS; import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP; import static org.apache.solr.common.params.CollectionParams.CollectionAction.*; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; +import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE; import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE; import static org.apache.solr.common.params.CommonParams.NAME; import static org.apache.solr.common.params.CommonParams.VALUE_LONG; @@ -921,6 +922,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission CollectionParams.SOURCE_NODE, CollectionParams.TARGET_NODE, WAIT_FOR_FINAL_STATE, + IN_PLACE_MOVE, "replica", "shard"); }), diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionTooManyReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionTooManyReplicasTest.java index a1c217589ea..daa267dd9f4 100644 --- a/solr/core/src/test/org/apache/solr/cloud/CollectionTooManyReplicasTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/CollectionTooManyReplicasTest.java @@ -195,8 +195,8 @@ public class CollectionTooManyReplicasTest extends SolrCloudTestCase { .setNode(deadNode) .process(cluster.getSolrClient()); }); - assertTrue("Should have gotten a message about shard not ", - e1.getMessage().contains("At least one of the node(s) specified are not currently active, no action taken.")); + assertTrue("Should have gotten a message about shard not currently active: " + e1.toString(), + e1.toString().contains("At least one of the node(s) specified [" + deadNode + "] are not currently active in")); // Should also die if we just add a shard Exception e2 = expectThrows(Exception.class, () -> { @@ -205,8 +205,8 @@ public class CollectionTooManyReplicasTest extends SolrCloudTestCase { .process(cluster.getSolrClient()); }); - assertTrue("Should have gotten a message about shard not ", - e2.getMessage().contains("At least one of the node(s) specified are not currently active, no action taken.")); + assertTrue("Should have gotten a message about shard not currently active: " + e2.toString(), + e2.toString().contains("At least one of the node(s) specified [" + deadNode + "] are not currently active in")); } finally { cluster.startJettySolrRunner(jetty); diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java index fb60d39d884..f5e9e7ed879 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java @@ -25,6 +25,7 @@ import org.apache.solr.util.BadHdfsThreadsFilter; import org.apache.solr.util.LogLevel; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Test; /** * @@ -56,6 +57,18 @@ public class MoveReplicaHDFSTest extends MoveReplicaTest { dfsCluster = null; } + @Test + public void testNormalMove() throws Exception { + inPlaceMove = false; + test(); + } + + @Test + public void testNormalFailedMove() throws Exception { + inPlaceMove = false; + testFailedMove(); + } + public static class ForkJoinThreadsFilter implements ThreadFilter { @Override diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java index 3e116fcf93f..631d94992a0 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java @@ -48,6 +48,7 @@ import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; +import org.apache.solr.util.IdUtils; import org.apache.solr.util.LogLevel; import org.junit.Before; import org.junit.BeforeClass; @@ -62,6 +63,9 @@ public class MoveReplicaTest extends SolrCloudTestCase { private static ZkStateReaderAccessor accessor; private static int overseerLeaderIndex; + // used by MoveReplicaHDFSTest + protected boolean inPlaceMove = true; + @BeforeClass public static void setupCluster() throws Exception { configureCluster(4) @@ -91,12 +95,17 @@ public class MoveReplicaTest extends SolrCloudTestCase { @Before public void beforeTest() throws Exception { cluster.deleteAllCollections(); + // restart any shut down nodes + for (int i = cluster.getJettySolrRunners().size(); i < 5; i++) { + cluster.startJettySolrRunner(); + } + cluster.waitForAllNodes(5000); + inPlaceMove = true; } @Test public void test() throws Exception { - cluster.waitForAllNodes(5000); - String coll = "movereplicatest_coll"; + String coll = getTestClass().getSimpleName() + "_coll_" + inPlaceMove; log.info("total_jettys: " + cluster.getJettySolrRunners().size()); int REPLICATION = 2; @@ -104,6 +113,7 @@ public class MoveReplicaTest extends SolrCloudTestCase { CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION); create.setMaxShardsPerNode(2); + create.setAutoAddReplicas(false); cloudClient.request(create); addDocs(coll, 100); @@ -133,8 +143,10 @@ public class MoveReplicaTest extends SolrCloudTestCase { int targetNumCores = getNumOfCores(cloudClient, targetNode, coll); CollectionAdminRequest.MoveReplica moveReplica = createMoveReplicaRequest(coll, replica, targetNode); - moveReplica.processAsync("000", cloudClient); - CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000"); + moveReplica.setInPlaceMove(inPlaceMove); + String asyncId = IdUtils.randomId(); + moveReplica.processAsync(asyncId, cloudClient); + CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus(asyncId); // wait for async request success boolean success = false; for (int i = 0; i < 200; i++) { @@ -193,6 +205,7 @@ public class MoveReplicaTest extends SolrCloudTestCase { assertEquals(watchers, newWatchers); moveReplica = createMoveReplicaRequest(coll, replica, targetNode, shardId); + moveReplica.setInPlaceMove(inPlaceMove); moveReplica.process(cloudClient); checkNumOfCores(cloudClient, replica.getNodeName(), coll, sourceNumCores); // wait for recovery @@ -232,12 +245,14 @@ public class MoveReplicaTest extends SolrCloudTestCase { assertTrue("replica never fully recovered", recovered); newWatchers = new HashSet<>(accessor.getStateWatchers(coll)); assertEquals(watchers, newWatchers); + + assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound()); } - @AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-11458") + // @AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-11458") @Test public void testFailedMove() throws Exception { - String coll = "movereplicatest_failed_coll"; + String coll = getTestClass().getSimpleName() + "_failed_coll_" + inPlaceMove; int REPLICATION = 2; CloudSolrClient cloudClient = cluster.getSolrClient(); @@ -245,6 +260,7 @@ public class MoveReplicaTest extends SolrCloudTestCase { Set watchers = new HashSet<>(accessor.getStateWatchers(coll)); CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION); + create.setAutoAddReplicas(false); cloudClient.request(create); addDocs(coll, 100); @@ -262,15 +278,17 @@ public class MoveReplicaTest extends SolrCloudTestCase { } assertNotNull(targetNode); CollectionAdminRequest.MoveReplica moveReplica = createMoveReplicaRequest(coll, replica, targetNode); + moveReplica.setInPlaceMove(inPlaceMove); // start moving - moveReplica.processAsync("001", cloudClient); + String asyncId = IdUtils.randomId(); + moveReplica.processAsync(asyncId, cloudClient); // shut down target node for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) { if (cluster.getJettySolrRunner(i).getNodeName().equals(targetNode)) { cluster.stopJettySolrRunner(i); } } - CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("001"); + CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus(asyncId); // wait for async request success boolean success = true; for (int i = 0; i < 200; i++) { @@ -286,6 +304,7 @@ public class MoveReplicaTest extends SolrCloudTestCase { Set newWatchers = new HashSet<>(accessor.getStateWatchers(coll)); assertEquals(watchers, newWatchers); + log.info("--- current collection state: " + cloudClient.getZkStateReader().getClusterState().getCollection(coll)); assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound()); } @@ -375,4 +394,5 @@ public class MoveReplicaTest extends SolrCloudTestCase { } solrClient.commit(collection); Thread.sleep(5000); - }} + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java index fd74c9e34bd..cacf39c14ea 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java @@ -119,6 +119,10 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { @Before public void setupTest() throws Exception { + // clear any persisted auto scaling configuration + Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true); + log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion()); + throttlingDelayMs.set(TimeUnit.SECONDS.toMillis(ScheduledTriggers.DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS)); waitForSeconds = 1 + random().nextInt(3); actionConstructorCalled = new CountDownLatch(1); @@ -130,9 +134,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { actionCompleted = new CountDownLatch(1); events.clear(); listenerEvents.clear(); - // clear any persisted auto scaling configuration - Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true); - log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion()); + lastActionExecutedAt.set(0); // clear any events or markers // todo: consider the impact of such cleanup on regular cluster restarts deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH); @@ -201,7 +203,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { JettySolrRunner newNode = cluster.startJettySolrRunner(); - if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) { + if (!triggerFiredLatch.await(30, TimeUnit.SECONDS)) { fail("Both triggers should have fired by now"); } @@ -249,7 +251,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { } } - if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) { + if (!triggerFiredLatch.await(30, TimeUnit.SECONDS)) { fail("Both triggers should have fired by now"); } } @@ -272,16 +274,20 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { return; } try { + long currentTime = timeSource.getTime(); if (lastActionExecutedAt.get() != 0) { - log.info("last action at " + lastActionExecutedAt.get() + " time = " + timeSource.getTime() + " expected diff: " + TimeUnit.MILLISECONDS.toNanos(throttlingDelayMs.get() - DELTA_MS)); - if (timeSource.getTime() - lastActionExecutedAt.get() < TimeUnit.MILLISECONDS.toNanos(throttlingDelayMs.get() - DELTA_MS)) { + long minDiff = TimeUnit.MILLISECONDS.toNanos(throttlingDelayMs.get() - DELTA_MS); + log.info("last action at " + lastActionExecutedAt.get() + " current time = " + currentTime + + "\nreal diff: " + (currentTime - lastActionExecutedAt.get()) + + "\n min diff: " + minDiff); + if (currentTime - lastActionExecutedAt.get() < minDiff) { log.info("action executed again before minimum wait time from {}", event.getSource()); fail("TriggerListener was fired before the throttling period"); } } if (onlyOnce.compareAndSet(false, true)) { log.info("action executed from {}", event.getSource()); - lastActionExecutedAt.set(timeSource.getTime()); + lastActionExecutedAt.set(currentTime); getTriggerFiredLatch().countDown(); } else { log.info("action executed more than once from {}", event.getSource()); @@ -1225,6 +1231,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS)); prevTimestamp = ev.timestamp; + // this also resets the cooldown period long modifiedCooldownPeriodSeconds = 7; String setPropertiesCommand = "{\n" + "\t\"set-properties\" : {\n" + @@ -1243,13 +1250,24 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { JettySolrRunner newNode3 = cluster.startJettySolrRunner(); await = triggerFiredLatch.await(20, TimeUnit.SECONDS); assertTrue("The trigger did not fire at all", await); + triggerFiredLatch = new CountDownLatch(1); + triggerFired.compareAndSet(true, false); + // add another node + JettySolrRunner newNode4 = cluster.startJettySolrRunner(); + await = triggerFiredLatch.await(20, TimeUnit.SECONDS); + assertTrue("The trigger did not fire at all", await); // wait for listener to capture the SUCCEEDED stage Thread.sleep(2000); - // there must be at least one IGNORED event due to cooldown, and one SUCCEEDED event + // there must be at least one SUCCEEDED (due to newNode3) then for newNode4 one IGNORED + // event due to cooldown, and one SUCCEEDED capturedEvents = listenerEvents.get("bar"); - assertTrue(capturedEvents.toString(), capturedEvents.size() > 1); - for (int i = 0; i < capturedEvents.size() - 1; i++) { + assertTrue(capturedEvents.toString(), capturedEvents.size() > 2); + // first event should be SUCCEEDED + ev = capturedEvents.get(0); + assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage); + + for (int i = 1; i < capturedEvents.size() - 1; i++) { ev = capturedEvents.get(i); assertEquals(ev.toString(), TriggerEventProcessorStage.IGNORED, ev.stage); assertTrue(ev.toString(), ev.message.contains("cooldown")); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java index 08811bd7d42..edc5a8be1ce 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java @@ -617,6 +617,8 @@ public abstract class CollectionAdminRequest protected String collection, replica, targetNode; protected String shard, sourceNode; protected boolean randomlyMoveReplica; + protected boolean inPlaceMove = true; + protected int timeout = -1; public MoveReplica(String collection, String replica, String targetNode) { super(CollectionAction.MOVEREPLICA); @@ -635,11 +637,23 @@ public abstract class CollectionAdminRequest this.randomlyMoveReplica = true; } + public void setInPlaceMove(boolean inPlaceMove) { + this.inPlaceMove = inPlaceMove; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + @Override public SolrParams getParams() { ModifiableSolrParams params = (ModifiableSolrParams) super.getParams(); params.set("collection", collection); params.set(CollectionParams.TARGET_NODE, targetNode); + params.set(CommonAdminParams.IN_PLACE_MOVE, inPlaceMove); + if (timeout != -1) { + params.set(CommonAdminParams.TIMEOUT, timeout); + } if (randomlyMoveReplica) { params.set("shard", shard); params.set(CollectionParams.SOURCE_NODE, sourceNode); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index e7781512581..4c2be1a57a7 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -89,6 +89,7 @@ public class ZkStateReader implements Closeable { public static final String SHARD_PARENT_PROP = "shard_parent"; public static final String NUM_SHARDS_PROP = "numShards"; public static final String LEADER_PROP = "leader"; + public static final String SHARED_STORAGE_PROP = "shared_storage"; public static final String PROPERTY_PROP = "property"; public static final String PROPERTY_PROP_PREFIX = "property."; public static final String PROPERTY_VALUE_PROP = "property.value"; diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java index f20afa7ca23..c39b4a88dbf 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java @@ -19,7 +19,12 @@ package org.apache.solr.common.params; public interface CommonAdminParams { - /** async or not? **/ + /** Async or not? **/ String ASYNC = "async"; + /** Wait for final state of the operation. */ String WAIT_FOR_FINAL_STATE = "waitForFinalState"; + /** Allow in-place move of replicas that use shared filesystems. */ + String IN_PLACE_MOVE = "inPlaceMove"; + /** Timeout for replicas to become active. */ + String TIMEOUT = "timeout"; } diff --git a/solr/solrj/src/resources/apispec/collections.collection.Commands.json b/solr/solrj/src/resources/apispec/collections.collection.Commands.json index dfae2f2bc90..0cd3644b48d 100644 --- a/solr/solrj/src/resources/apispec/collections.collection.Commands.json +++ b/solr/solrj/src/resources/apispec/collections.collection.Commands.json @@ -20,7 +20,7 @@ "move-replica": { "type": "object", "documentation": "https://lucene.apache.org/solr/guide/collections-api.html#movereplica", - "description": "This command moves a replica from one node to a new node. In case of shared filesystems the `dataDir` will be reused.", + "description": "This command moves a replica from one node to a new node. In case of shared filesystems the `dataDir` and `ulogDir` may be reused.", "properties": { "replica": { "type": "string", @@ -32,13 +32,29 @@ }, "sourceNode": { "type": "string", - "description": "The name of the node that contains the replica" + "description": "The name of the node that contains the replica." }, "targetNode": { "type": "string", - "description": "The name of the destination node. This parameter is required" + "description": "The name of the destination node. This parameter is required." + }, + "waitForFinalState": { + "type": "boolean", + "default": "false", + "description": "Wait for the moved replica to become active." + }, + "timeout": { + "type": "integer", + "default": 600, + "description": "Timeout to wait for replica to become active. For very large replicas this may need to be increased." + }, + "inPlaceMove": { + "type": "boolean", + "default": "true", + "description": "For replicas that use shared filesystems allow 'in-place' move that reuses shared data." } - } + }, + "required":["targetNode"] }, "migrate-docs":{ "type":"object",