diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index c461027827b..469c0a25cbc 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -96,6 +96,8 @@ Bug Fixes * SOLR-6709: Fix QueryResponse to deal with the "expanded" section when using the XMLResponseParser (Varun Thacker, Joel Bernstein) +* SOLR-7066: autoAddReplicas feature has bug when selecting replacement nodes. (Mark Miller) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java b/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java index 108452b338c..6a5bf314a66 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java @@ -149,7 +149,7 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable { ClusterState clusterState = zkStateReader.getClusterState(); //check if we have disabled autoAddReplicas cluster wide String autoAddReplicas = (String) zkStateReader.getClusterProps().get(ZkStateReader.AUTO_ADD_REPLICAS); - if (autoAddReplicas !=null && autoAddReplicas.equals("false")) { + if (autoAddReplicas != null && autoAddReplicas.equals("false")) { return; } if (clusterState != null) { @@ -164,15 +164,17 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable { lastClusterStateVersion = clusterState.getZkClusterStateVersion(); Set collections = clusterState.getCollections(); for (final String collection : collections) { + log.debug("look at collection={}", collection); DocCollection docCollection = clusterState.getCollection(collection); if (!docCollection.getAutoAddReplicas()) { + log.debug("Collection {} is not setup to use autoAddReplicas, skipping..", docCollection.getName()); continue; } if (docCollection.getReplicationFactor() == null) { log.debug("Skipping collection because it has no defined replicationFactor, name={}", docCollection.getName()); continue; } - log.debug("Found collection, name={} replicationFactor=", collection, docCollection.getReplicationFactor()); + log.debug("Found collection, name={} replicationFactor={}", collection, docCollection.getReplicationFactor()); Collection slices = docCollection.getSlices(); for (Slice slice : slices) { @@ -182,7 +184,7 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable { int goodReplicas = findDownReplicasInSlice(clusterState, docCollection, slice, downReplicas); - log.debug("replicationFactor={} goodReplicaCount={}", docCollection.getReplicationFactor(), goodReplicas); + log.debug("collection={} replicationFactor={} goodReplicaCount={}", docCollection.getName(), docCollection.getReplicationFactor(), goodReplicas); if (downReplicas.size() > 0 && goodReplicas < docCollection.getReplicationFactor()) { // badReplicaMap.put(collection, badReplicas); @@ -199,7 +201,7 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable { private void processBadReplicas(final String collection, final Collection badReplicas) { for (DownReplica badReplica : badReplicas) { - log.debug("process down replica {}", badReplica.replica.getName()); + log.debug("process down replica={} from collection={}", badReplica.replica.getName(), collection); String baseUrl = badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP); Long wentBadAtNS = baseUrlForBadNodes.getIfPresent(baseUrl); if (wentBadAtNS == null) { @@ -252,7 +254,7 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable { }); // wait to see state for core we just created - boolean success = ClusterStateUtil.waitToSeeLive(zkStateReader, collection, coreNodeName, createUrl, 30000); + boolean success = ClusterStateUtil.waitToSeeLiveReplica(zkStateReader, collection, coreNodeName, createUrl, 30000); if (!success) { log.error("Creating new replica appears to have failed, timed out waiting to see created SolrCore register in the clusterstate."); return false; @@ -304,8 +306,9 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable { assert badReplica != null; assert badReplica.collection != null; assert badReplica.slice != null; - Map counts = new HashMap<>(); - ValueComparator vc = new ValueComparator(counts); + log.debug("getBestCreateUrl for " + badReplica.replica); + Map counts = new HashMap(); + Set unsuitableHosts = new HashSet(); Set liveNodes = new HashSet<>(zkStateReader.getClusterState().getLiveNodes()); @@ -320,20 +323,20 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable { for (Slice slice : slices) { // only look at active shards if (slice.getState() == Slice.State.ACTIVE) { - log.debug("look at slice {} as possible create candidate", slice.getName()); + log.debug("look at slice {} for collection {} as possible create candidate", slice.getName(), collection); Collection replicas = slice.getReplicas(); for (Replica replica : replicas) { liveNodes.remove(replica.getNodeName()); - if (replica.getStr(ZkStateReader.BASE_URL_PROP).equals( + String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP); + if (baseUrl.equals( badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP))) { continue; } - String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP); // on a live node? - log.debug("nodename={} livenodes={}", replica.getNodeName(), clusterState.getLiveNodes()); + log.debug("collection={} nodename={} livenodes={}", collection, replica.getNodeName(), clusterState.getLiveNodes()); boolean live = clusterState.liveNodesContain(replica.getNodeName()); - log.debug("look at replica {} as possible create candidate, live={}", replica.getName(), live); + log.debug("collection={} look at replica {} as possible create candidate, live={}", collection, replica.getName(), live); if (live) { Counts cnt = counts.get(baseUrl); if (cnt == null) { @@ -351,8 +354,12 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable { // TODO: this is collection wide and we want to take into // account cluster wide - use new cluster sys prop - int maxShardsPerNode = docCollection.getMaxShardsPerNode(); - log.debug("max shards per node={} good replicas={}", maxShardsPerNode, cnt); + Integer maxShardsPerNode = badReplica.collection.getMaxShardsPerNode(); + if (maxShardsPerNode == null) { + log.warn("maxShardsPerNode is not defined for collection, name=" + badReplica.collection.getName()); + maxShardsPerNode = Integer.MAX_VALUE; + } + log.debug("collection={} node={} max shards per node={} potential hosts={}", collection, baseUrl, maxShardsPerNode, cnt); Collection badSliceReplicas = null; DocCollection c = clusterState.getCollection(badReplica.collection.getName()); @@ -363,10 +370,13 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable { } } boolean alreadyExistsOnNode = replicaAlreadyExistsOnNode(zkStateReader.getClusterState(), badSliceReplicas, badReplica, baseUrl); - if (alreadyExistsOnNode || cnt.collectionShardsOnNode >= maxShardsPerNode) { - counts.remove(replica.getStr(ZkStateReader.BASE_URL_PROP)); + if (unsuitableHosts.contains(baseUrl) || alreadyExistsOnNode || cnt.collectionShardsOnNode >= maxShardsPerNode) { + counts.remove(baseUrl); + unsuitableHosts.add(baseUrl); + log.debug("not a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt); } else { - counts.put(replica.getStr(ZkStateReader.BASE_URL_PROP), cnt); + counts.put(baseUrl, cnt); + log.debug("is a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt); } } } @@ -380,32 +390,35 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable { } if (counts.size() == 0) { + log.debug("no suitable hosts found for getBestCreateUrl for collection={}", badReplica.collection.getName()); return null; } - Map sortedCounts = new TreeMap<>(vc); + ValueComparator vc = new ValueComparator(counts); + Map sortedCounts = new TreeMap(vc); sortedCounts.putAll(counts); - log.debug("empty nodes={}", liveNodes); - log.debug("sorted hosts={}", sortedCounts); + log.debug("empty nodes={} for collection={}", liveNodes, badReplica.collection.getName()); + log.debug("sorted hosts={} for collection={}", sortedCounts, badReplica.collection.getName()); + log.debug("unsuitable hosts={} for collection={}", unsuitableHosts, badReplica.collection.getName()); return sortedCounts.keySet().iterator().next(); } private static boolean replicaAlreadyExistsOnNode(ClusterState clusterState, Collection replicas, DownReplica badReplica, String baseUrl) { if (replicas != null) { - log.debug("check if replica already exists on node using replicas {}", getNames(replicas)); + log.debug("collection={} check if replica already exists on node using replicas {}", badReplica.collection.getName(), getNames(replicas)); for (Replica replica : replicas) { final Replica.State state = replica.getState(); if (!replica.getName().equals(badReplica.replica.getName()) && replica.getStr(ZkStateReader.BASE_URL_PROP).equals(baseUrl) && clusterState.liveNodesContain(replica.getNodeName()) && (state == Replica.State.ACTIVE || state == Replica.State.DOWN || state == Replica.State.RECOVERING)) { - log.debug("replica already exists on node, bad replica={}, existing replica={}, node name={}", badReplica.replica.getName(), replica.getName(), replica.getNodeName()); + log.debug("collection={} replica already exists on node, bad replica={}, existing replica={}, node name={}", badReplica.collection.getName(), badReplica.replica.getName(), replica.getName(), replica.getNodeName()); return true; } } } - log.debug("replica does not yet exist on node: {}", baseUrl); + log.debug("collection={} replica does not yet exist on node: {}", badReplica.collection.getName(), baseUrl); return false; } @@ -484,7 +497,7 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable { @Override public String toString() { return "Counts [negRankingWeight=" + negRankingWeight + ", sameSliceCount=" - + ourReplicas + "]"; + + ourReplicas + ", collectionShardsOnNode=" + collectionShardsOnNode + "]"; } } diff --git a/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java b/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java index e856ced1fa1..be595468526 100644 --- a/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java @@ -17,6 +17,8 @@ package org.apache.solr.cloud; * limitations under the License. */ +import static org.apache.solr.common.cloud.ZkNodeProps.makeMap; + import java.util.Collection; import java.util.HashSet; import java.util.Map; @@ -28,8 +30,6 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; - import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.lucene.util.LuceneTestCase.Nightly; import org.apache.lucene.util.LuceneTestCase.Slow; @@ -40,7 +40,6 @@ import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.cloud.hdfs.HdfsTestUtil; import org.apache.solr.common.cloud.ClusterStateUtil; -import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionParams; @@ -51,7 +50,8 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import static org.apache.solr.common.cloud.ZkNodeProps.makeMap; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + @Nightly @Slow @@ -148,37 +148,66 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa assertTrue(response2.isSuccess()); waitForRecoveriesToFinish(collection2, false); + + String collection3 = "solrj_collection3"; + createCollectionRequest = new Create(); + createCollectionRequest.setCollectionName(collection3); + createCollectionRequest.setNumShards(5); + createCollectionRequest.setReplicationFactor(1); + createCollectionRequest.setMaxShardsPerNode(1); + createCollectionRequest.setConfigName("conf1"); + createCollectionRequest.setRouterField("myOwnField"); + createCollectionRequest.setAutoAddReplicas(true); + CollectionAdminResponse response3 = createCollectionRequest.process(getCommonCloudSolrClient()); + + assertEquals(0, response3.getStatus()); + assertTrue(response3.isSuccess()); + + waitForRecoveriesToFinish(collection3, false); ChaosMonkey.stop(jettys.get(1)); ChaosMonkey.stop(jettys.get(2)); - Thread.sleep(3000); + Thread.sleep(5000); - assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 120000)); + assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 120000)); assertSliceAndReplicaCount(collection1); - assertEquals(4, getLiveAndActiveCount(collection1)); - assertTrue(getLiveAndActiveCount(collection2) < 4); + assertEquals(4, ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1)); + assertTrue(ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection2) < 4); + // collection3 has maxShardsPerNode=1, there are 4 standard jetties and one control jetty and 2 nodes stopped + ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 3, 30000); + + // collection1 should still be at 4 + assertEquals(4, ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1)); + // and collection2 less than 4 + assertTrue(ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection2) < 4); + ChaosMonkey.stop(jettys); ChaosMonkey.stop(controlJetty); - assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllNotLive(cloudClient.getZkStateReader(), 45000)); + assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllReplicasNotLive(cloudClient.getZkStateReader(), 45000)); ChaosMonkey.start(jettys); ChaosMonkey.start(controlJetty); - assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 120000)); + assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 120000)); assertSliceAndReplicaCount(collection1); - + assertSingleReplicationAndShardSize(collection3, 5); + int jettyIndex = random().nextInt(jettys.size()); ChaosMonkey.stop(jettys.get(jettyIndex)); ChaosMonkey.start(jettys.get(jettyIndex)); - - assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 60000)); - + + assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 60000)); + + assertSliceAndReplicaCount(collection1); + + assertSingleReplicationAndShardSize(collection3, 5); + ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 5, 30000); //disable autoAddReplicas Map m = makeMap( "action", CollectionParams.CollectionAction.CLUSTERPROP.toLower(), @@ -189,7 +218,7 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa request.setPath("/admin/collections"); cloudClient.request(request); - int currentCount = getLiveAndActiveCount(collection1); + int currentCount = ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1); ChaosMonkey.stop(jettys.get(3)); @@ -197,7 +226,7 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa //Hence waiting for 30 seconds to be on the safe side. Thread.sleep(30000); //Ensures that autoAddReplicas has not kicked in. - assertTrue(currentCount > getLiveAndActiveCount(collection1)); + assertTrue(currentCount > ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1)); //enable autoAddReplicas m = makeMap( @@ -208,24 +237,17 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa request.setPath("/admin/collections"); cloudClient.request(request); - assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 60000)); + assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 60000)); assertSliceAndReplicaCount(collection1); } - - private int getLiveAndActiveCount(String collection1) { + + private void assertSingleReplicationAndShardSize(String collection, int numSlices) { Collection slices; - slices = cloudClient.getZkStateReader().getClusterState().getActiveSlices(collection1); - int liveAndActive = 0; + slices = cloudClient.getZkStateReader().getClusterState().getActiveSlices(collection); + assertEquals(numSlices, slices.size()); for (Slice slice : slices) { - for (Replica replica : slice.getReplicas()) { - boolean live = cloudClient.getZkStateReader().getClusterState().liveNodesContain(replica.getNodeName()); - boolean active = replica.getState() == Replica.State.ACTIVE; - if (live && active) { - liveAndActive++; - } - } + assertEquals(1, slice.getReplicas().size()); } - return liveAndActive; } private void assertSliceAndReplicaCount(String collection) { diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java index 9c24b76c0ad..89bf082e52b 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java @@ -41,8 +41,8 @@ public class ClusterStateUtil { * how long to wait before giving up * @return false if timed out */ - public static boolean waitForAllActiveAndLive(ZkStateReader zkStateReader, int timeoutInMs) { - return waitForAllActiveAndLive(zkStateReader, null, timeoutInMs); + public static boolean waitForAllActiveAndLiveReplicas(ZkStateReader zkStateReader, int timeoutInMs) { + return waitForAllActiveAndLiveReplicas(zkStateReader, null, timeoutInMs); } /** @@ -55,12 +55,12 @@ public class ClusterStateUtil { * how long to wait before giving up * @return false if timed out */ - public static boolean waitForAllActiveAndLive(ZkStateReader zkStateReader, String collection, + public static boolean waitForAllActiveAndLiveReplicas(ZkStateReader zkStateReader, String collection, int timeoutInMs) { long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS); boolean success = false; - while (System.nanoTime() < timeout) { + while (!success && System.nanoTime() < timeout) { success = true; ClusterState clusterState = zkStateReader.getClusterState(); if (clusterState != null) { @@ -119,7 +119,7 @@ public class ClusterStateUtil { * how long to wait before giving up * @return false if timed out */ - public static boolean waitToSeeLive(ZkStateReader zkStateReader, + public static boolean waitToSeeLiveReplica(ZkStateReader zkStateReader, String collection, String coreNodeName, String baseUrl, int timeoutInMs) { long timeout = System.nanoTime() @@ -162,17 +162,17 @@ public class ClusterStateUtil { return false; } - public static boolean waitForAllNotLive(ZkStateReader zkStateReader, int timeoutInMs) { - return waitForAllNotLive(zkStateReader, null, timeoutInMs); + public static boolean waitForAllReplicasNotLive(ZkStateReader zkStateReader, int timeoutInMs) { + return waitForAllReplicasNotLive(zkStateReader, null, timeoutInMs); } - public static boolean waitForAllNotLive(ZkStateReader zkStateReader, + public static boolean waitForAllReplicasNotLive(ZkStateReader zkStateReader, String collection, int timeoutInMs) { long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS); boolean success = false; - while (System.nanoTime() < timeout) { + while (!success && System.nanoTime() < timeout) { success = true; ClusterState clusterState = zkStateReader.getClusterState(); if (clusterState != null) { @@ -215,6 +215,44 @@ public class ClusterStateUtil { return success; } + public static int getLiveAndActiveReplicaCount(ZkStateReader zkStateReader, String collection) { + Collection slices; + slices = zkStateReader.getClusterState().getActiveSlices(collection); + int liveAndActive = 0; + for (Slice slice : slices) { + for (Replica replica : slice.getReplicas()) { + boolean live = zkStateReader.getClusterState().liveNodesContain(replica.getNodeName()); + boolean active = replica.getState() == Replica.State.ACTIVE; + if (live && active) { + liveAndActive++; + } + } + } + return liveAndActive; + } + + public static boolean waitForLiveAndActiveReplicaCount(ZkStateReader zkStateReader, + String collection, int replicaCount, int timeoutInMs) { + long timeout = System.nanoTime() + + TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS); + boolean success = false; + while (!success && System.nanoTime() < timeout) { + success = getLiveAndActiveReplicaCount(zkStateReader, collection) == replicaCount; + + if (!success) { + try { + Thread.sleep(TIMEOUT_POLL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted"); + } + } + + } + + return success; + } + public static boolean isAutoAddReplicas(ZkStateReader reader, String collection) { ClusterState clusterState = reader.getClusterState(); if (clusterState != null) {