SOLR-7066: autoAddReplicas feature has bug when selecting replacement nodes.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1672201 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2015-04-08 21:49:21 +00:00
parent c08d81cc39
commit f3d352501a
4 changed files with 137 additions and 62 deletions

View File

@ -96,6 +96,8 @@ Bug Fixes
* SOLR-6709: Fix QueryResponse to deal with the "expanded" section when using the XMLResponseParser
(Varun Thacker, Joel Bernstein)
* SOLR-7066: autoAddReplicas feature has bug when selecting replacement nodes. (Mark Miller)
Optimizations
----------------------

View File

@ -149,7 +149,7 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
ClusterState clusterState = zkStateReader.getClusterState();
//check if we have disabled autoAddReplicas cluster wide
String autoAddReplicas = (String) zkStateReader.getClusterProps().get(ZkStateReader.AUTO_ADD_REPLICAS);
if (autoAddReplicas !=null && autoAddReplicas.equals("false")) {
if (autoAddReplicas != null && autoAddReplicas.equals("false")) {
return;
}
if (clusterState != null) {
@ -164,15 +164,17 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
lastClusterStateVersion = clusterState.getZkClusterStateVersion();
Set<String> collections = clusterState.getCollections();
for (final String collection : collections) {
log.debug("look at collection={}", collection);
DocCollection docCollection = clusterState.getCollection(collection);
if (!docCollection.getAutoAddReplicas()) {
log.debug("Collection {} is not setup to use autoAddReplicas, skipping..", docCollection.getName());
continue;
}
if (docCollection.getReplicationFactor() == null) {
log.debug("Skipping collection because it has no defined replicationFactor, name={}", docCollection.getName());
continue;
}
log.debug("Found collection, name={} replicationFactor=", collection, docCollection.getReplicationFactor());
log.debug("Found collection, name={} replicationFactor={}", collection, docCollection.getReplicationFactor());
Collection<Slice> slices = docCollection.getSlices();
for (Slice slice : slices) {
@ -182,7 +184,7 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
int goodReplicas = findDownReplicasInSlice(clusterState, docCollection, slice, downReplicas);
log.debug("replicationFactor={} goodReplicaCount={}", docCollection.getReplicationFactor(), goodReplicas);
log.debug("collection={} replicationFactor={} goodReplicaCount={}", docCollection.getName(), docCollection.getReplicationFactor(), goodReplicas);
if (downReplicas.size() > 0 && goodReplicas < docCollection.getReplicationFactor()) {
// badReplicaMap.put(collection, badReplicas);
@ -199,7 +201,7 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
private void processBadReplicas(final String collection, final Collection<DownReplica> badReplicas) {
for (DownReplica badReplica : badReplicas) {
log.debug("process down replica {}", badReplica.replica.getName());
log.debug("process down replica={} from collection={}", badReplica.replica.getName(), collection);
String baseUrl = badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP);
Long wentBadAtNS = baseUrlForBadNodes.getIfPresent(baseUrl);
if (wentBadAtNS == null) {
@ -252,7 +254,7 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
});
// wait to see state for core we just created
boolean success = ClusterStateUtil.waitToSeeLive(zkStateReader, collection, coreNodeName, createUrl, 30000);
boolean success = ClusterStateUtil.waitToSeeLiveReplica(zkStateReader, collection, coreNodeName, createUrl, 30000);
if (!success) {
log.error("Creating new replica appears to have failed, timed out waiting to see created SolrCore register in the clusterstate.");
return false;
@ -304,8 +306,9 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
assert badReplica != null;
assert badReplica.collection != null;
assert badReplica.slice != null;
Map<String,Counts> counts = new HashMap<>();
ValueComparator vc = new ValueComparator(counts);
log.debug("getBestCreateUrl for " + badReplica.replica);
Map<String,Counts> counts = new HashMap<String, Counts>();
Set<String> unsuitableHosts = new HashSet<String>();
Set<String> liveNodes = new HashSet<>(zkStateReader.getClusterState().getLiveNodes());
@ -320,20 +323,20 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
for (Slice slice : slices) {
// only look at active shards
if (slice.getState() == Slice.State.ACTIVE) {
log.debug("look at slice {} as possible create candidate", slice.getName());
log.debug("look at slice {} for collection {} as possible create candidate", slice.getName(), collection);
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
liveNodes.remove(replica.getNodeName());
if (replica.getStr(ZkStateReader.BASE_URL_PROP).equals(
String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
if (baseUrl.equals(
badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP))) {
continue;
}
String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
// on a live node?
log.debug("nodename={} livenodes={}", replica.getNodeName(), clusterState.getLiveNodes());
log.debug("collection={} nodename={} livenodes={}", collection, replica.getNodeName(), clusterState.getLiveNodes());
boolean live = clusterState.liveNodesContain(replica.getNodeName());
log.debug("look at replica {} as possible create candidate, live={}", replica.getName(), live);
log.debug("collection={} look at replica {} as possible create candidate, live={}", collection, replica.getName(), live);
if (live) {
Counts cnt = counts.get(baseUrl);
if (cnt == null) {
@ -351,8 +354,12 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
// TODO: this is collection wide and we want to take into
// account cluster wide - use new cluster sys prop
int maxShardsPerNode = docCollection.getMaxShardsPerNode();
log.debug("max shards per node={} good replicas={}", maxShardsPerNode, cnt);
Integer maxShardsPerNode = badReplica.collection.getMaxShardsPerNode();
if (maxShardsPerNode == null) {
log.warn("maxShardsPerNode is not defined for collection, name=" + badReplica.collection.getName());
maxShardsPerNode = Integer.MAX_VALUE;
}
log.debug("collection={} node={} max shards per node={} potential hosts={}", collection, baseUrl, maxShardsPerNode, cnt);
Collection<Replica> badSliceReplicas = null;
DocCollection c = clusterState.getCollection(badReplica.collection.getName());
@ -363,10 +370,13 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
}
}
boolean alreadyExistsOnNode = replicaAlreadyExistsOnNode(zkStateReader.getClusterState(), badSliceReplicas, badReplica, baseUrl);
if (alreadyExistsOnNode || cnt.collectionShardsOnNode >= maxShardsPerNode) {
counts.remove(replica.getStr(ZkStateReader.BASE_URL_PROP));
if (unsuitableHosts.contains(baseUrl) || alreadyExistsOnNode || cnt.collectionShardsOnNode >= maxShardsPerNode) {
counts.remove(baseUrl);
unsuitableHosts.add(baseUrl);
log.debug("not a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
} else {
counts.put(replica.getStr(ZkStateReader.BASE_URL_PROP), cnt);
counts.put(baseUrl, cnt);
log.debug("is a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
}
}
}
@ -380,32 +390,35 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
}
if (counts.size() == 0) {
log.debug("no suitable hosts found for getBestCreateUrl for collection={}", badReplica.collection.getName());
return null;
}
Map<String,Counts> sortedCounts = new TreeMap<>(vc);
ValueComparator vc = new ValueComparator(counts);
Map<String,Counts> sortedCounts = new TreeMap<String, Counts>(vc);
sortedCounts.putAll(counts);
log.debug("empty nodes={}", liveNodes);
log.debug("sorted hosts={}", sortedCounts);
log.debug("empty nodes={} for collection={}", liveNodes, badReplica.collection.getName());
log.debug("sorted hosts={} for collection={}", sortedCounts, badReplica.collection.getName());
log.debug("unsuitable hosts={} for collection={}", unsuitableHosts, badReplica.collection.getName());
return sortedCounts.keySet().iterator().next();
}
private static boolean replicaAlreadyExistsOnNode(ClusterState clusterState, Collection<Replica> replicas, DownReplica badReplica, String baseUrl) {
if (replicas != null) {
log.debug("check if replica already exists on node using replicas {}", getNames(replicas));
log.debug("collection={} check if replica already exists on node using replicas {}", badReplica.collection.getName(), getNames(replicas));
for (Replica replica : replicas) {
final Replica.State state = replica.getState();
if (!replica.getName().equals(badReplica.replica.getName()) && replica.getStr(ZkStateReader.BASE_URL_PROP).equals(baseUrl)
&& clusterState.liveNodesContain(replica.getNodeName())
&& (state == Replica.State.ACTIVE || state == Replica.State.DOWN || state == Replica.State.RECOVERING)) {
log.debug("replica already exists on node, bad replica={}, existing replica={}, node name={}", badReplica.replica.getName(), replica.getName(), replica.getNodeName());
log.debug("collection={} replica already exists on node, bad replica={}, existing replica={}, node name={}", badReplica.collection.getName(), badReplica.replica.getName(), replica.getName(), replica.getNodeName());
return true;
}
}
}
log.debug("replica does not yet exist on node: {}", baseUrl);
log.debug("collection={} replica does not yet exist on node: {}", badReplica.collection.getName(), baseUrl);
return false;
}
@ -484,7 +497,7 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
@Override
public String toString() {
return "Counts [negRankingWeight=" + negRankingWeight + ", sameSliceCount="
+ ourReplicas + "]";
+ ourReplicas + ", collectionShardsOnNode=" + collectionShardsOnNode + "]";
}
}

View File

@ -17,6 +17,8 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
@ -28,8 +30,6 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.lucene.util.LuceneTestCase.Slow;
@ -40,7 +40,6 @@ import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
import org.apache.solr.common.cloud.ClusterStateUtil;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
@ -51,7 +50,8 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
@Nightly
@Slow
@ -148,37 +148,66 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
assertTrue(response2.isSuccess());
waitForRecoveriesToFinish(collection2, false);
String collection3 = "solrj_collection3";
createCollectionRequest = new Create();
createCollectionRequest.setCollectionName(collection3);
createCollectionRequest.setNumShards(5);
createCollectionRequest.setReplicationFactor(1);
createCollectionRequest.setMaxShardsPerNode(1);
createCollectionRequest.setConfigName("conf1");
createCollectionRequest.setRouterField("myOwnField");
createCollectionRequest.setAutoAddReplicas(true);
CollectionAdminResponse response3 = createCollectionRequest.process(getCommonCloudSolrClient());
assertEquals(0, response3.getStatus());
assertTrue(response3.isSuccess());
waitForRecoveriesToFinish(collection3, false);
ChaosMonkey.stop(jettys.get(1));
ChaosMonkey.stop(jettys.get(2));
Thread.sleep(3000);
Thread.sleep(5000);
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 120000));
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 120000));
assertSliceAndReplicaCount(collection1);
assertEquals(4, getLiveAndActiveCount(collection1));
assertTrue(getLiveAndActiveCount(collection2) < 4);
assertEquals(4, ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1));
assertTrue(ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection2) < 4);
// collection3 has maxShardsPerNode=1, there are 4 standard jetties and one control jetty and 2 nodes stopped
ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 3, 30000);
// collection1 should still be at 4
assertEquals(4, ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1));
// and collection2 less than 4
assertTrue(ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection2) < 4);
ChaosMonkey.stop(jettys);
ChaosMonkey.stop(controlJetty);
assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllNotLive(cloudClient.getZkStateReader(), 45000));
assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllReplicasNotLive(cloudClient.getZkStateReader(), 45000));
ChaosMonkey.start(jettys);
ChaosMonkey.start(controlJetty);
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 120000));
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 120000));
assertSliceAndReplicaCount(collection1);
assertSingleReplicationAndShardSize(collection3, 5);
int jettyIndex = random().nextInt(jettys.size());
ChaosMonkey.stop(jettys.get(jettyIndex));
ChaosMonkey.start(jettys.get(jettyIndex));
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 60000));
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 60000));
assertSliceAndReplicaCount(collection1);
assertSingleReplicationAndShardSize(collection3, 5);
ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 5, 30000);
//disable autoAddReplicas
Map m = makeMap(
"action", CollectionParams.CollectionAction.CLUSTERPROP.toLower(),
@ -189,7 +218,7 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
request.setPath("/admin/collections");
cloudClient.request(request);
int currentCount = getLiveAndActiveCount(collection1);
int currentCount = ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1);
ChaosMonkey.stop(jettys.get(3));
@ -197,7 +226,7 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
//Hence waiting for 30 seconds to be on the safe side.
Thread.sleep(30000);
//Ensures that autoAddReplicas has not kicked in.
assertTrue(currentCount > getLiveAndActiveCount(collection1));
assertTrue(currentCount > ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1));
//enable autoAddReplicas
m = makeMap(
@ -208,24 +237,17 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
request.setPath("/admin/collections");
cloudClient.request(request);
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 60000));
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 60000));
assertSliceAndReplicaCount(collection1);
}
private int getLiveAndActiveCount(String collection1) {
private void assertSingleReplicationAndShardSize(String collection, int numSlices) {
Collection<Slice> slices;
slices = cloudClient.getZkStateReader().getClusterState().getActiveSlices(collection1);
int liveAndActive = 0;
slices = cloudClient.getZkStateReader().getClusterState().getActiveSlices(collection);
assertEquals(numSlices, slices.size());
for (Slice slice : slices) {
for (Replica replica : slice.getReplicas()) {
boolean live = cloudClient.getZkStateReader().getClusterState().liveNodesContain(replica.getNodeName());
boolean active = replica.getState() == Replica.State.ACTIVE;
if (live && active) {
liveAndActive++;
}
}
assertEquals(1, slice.getReplicas().size());
}
return liveAndActive;
}
private void assertSliceAndReplicaCount(String collection) {

View File

@ -41,8 +41,8 @@ public class ClusterStateUtil {
* how long to wait before giving up
* @return false if timed out
*/
public static boolean waitForAllActiveAndLive(ZkStateReader zkStateReader, int timeoutInMs) {
return waitForAllActiveAndLive(zkStateReader, null, timeoutInMs);
public static boolean waitForAllActiveAndLiveReplicas(ZkStateReader zkStateReader, int timeoutInMs) {
return waitForAllActiveAndLiveReplicas(zkStateReader, null, timeoutInMs);
}
/**
@ -55,12 +55,12 @@ public class ClusterStateUtil {
* how long to wait before giving up
* @return false if timed out
*/
public static boolean waitForAllActiveAndLive(ZkStateReader zkStateReader, String collection,
public static boolean waitForAllActiveAndLiveReplicas(ZkStateReader zkStateReader, String collection,
int timeoutInMs) {
long timeout = System.nanoTime()
+ TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
boolean success = false;
while (System.nanoTime() < timeout) {
while (!success && System.nanoTime() < timeout) {
success = true;
ClusterState clusterState = zkStateReader.getClusterState();
if (clusterState != null) {
@ -119,7 +119,7 @@ public class ClusterStateUtil {
* how long to wait before giving up
* @return false if timed out
*/
public static boolean waitToSeeLive(ZkStateReader zkStateReader,
public static boolean waitToSeeLiveReplica(ZkStateReader zkStateReader,
String collection, String coreNodeName, String baseUrl,
int timeoutInMs) {
long timeout = System.nanoTime()
@ -162,17 +162,17 @@ public class ClusterStateUtil {
return false;
}
public static boolean waitForAllNotLive(ZkStateReader zkStateReader, int timeoutInMs) {
return waitForAllNotLive(zkStateReader, null, timeoutInMs);
public static boolean waitForAllReplicasNotLive(ZkStateReader zkStateReader, int timeoutInMs) {
return waitForAllReplicasNotLive(zkStateReader, null, timeoutInMs);
}
public static boolean waitForAllNotLive(ZkStateReader zkStateReader,
public static boolean waitForAllReplicasNotLive(ZkStateReader zkStateReader,
String collection, int timeoutInMs) {
long timeout = System.nanoTime()
+ TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
boolean success = false;
while (System.nanoTime() < timeout) {
while (!success && System.nanoTime() < timeout) {
success = true;
ClusterState clusterState = zkStateReader.getClusterState();
if (clusterState != null) {
@ -215,6 +215,44 @@ public class ClusterStateUtil {
return success;
}
public static int getLiveAndActiveReplicaCount(ZkStateReader zkStateReader, String collection) {
Collection<Slice> slices;
slices = zkStateReader.getClusterState().getActiveSlices(collection);
int liveAndActive = 0;
for (Slice slice : slices) {
for (Replica replica : slice.getReplicas()) {
boolean live = zkStateReader.getClusterState().liveNodesContain(replica.getNodeName());
boolean active = replica.getState() == Replica.State.ACTIVE;
if (live && active) {
liveAndActive++;
}
}
}
return liveAndActive;
}
public static boolean waitForLiveAndActiveReplicaCount(ZkStateReader zkStateReader,
String collection, int replicaCount, int timeoutInMs) {
long timeout = System.nanoTime()
+ TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
boolean success = false;
while (!success && System.nanoTime() < timeout) {
success = getLiveAndActiveReplicaCount(zkStateReader, collection) == replicaCount;
if (!success) {
try {
Thread.sleep(TIMEOUT_POLL_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
}
}
}
return success;
}
public static boolean isAutoAddReplicas(ZkStateReader reader, String collection) {
ClusterState clusterState = reader.getClusterState();
if (clusterState != null) {