SOLR-7736: Fix ZkController.publishAndWaitForDownStates

This commit is contained in:
Shalin Shekhar Mangar 2018-03-27 10:28:34 +05:30
parent e51735d8ea
commit ecb94ba442
2 changed files with 66 additions and 30 deletions

View File

@ -898,18 +898,19 @@ public class ZkController {
publishNodeAsDown(getNodeName());
Set<String> collectionsWithLocalReplica = ConcurrentHashMap.newKeySet();
for (SolrCore core : cc.getCores()) {
collectionsWithLocalReplica.add(core.getCoreDescriptor().getCloudDescriptor().getCollectionName());
for (CoreDescriptor descriptor : cc.getCoreDescriptors()) {
collectionsWithLocalReplica.add(descriptor.getCloudDescriptor().getCollectionName());
}
CountDownLatch latch = new CountDownLatch(collectionsWithLocalReplica.size());
for (String collectionWithLocalReplica : collectionsWithLocalReplica) {
zkStateReader.registerCollectionStateWatcher(collectionWithLocalReplica, (liveNodes, collectionState) -> {
if (collectionState == null) return false;
boolean foundStates = true;
for (SolrCore core : cc.getCores()) {
if (core.getCoreDescriptor().getCloudDescriptor().getCollectionName().equals(collectionWithLocalReplica)) {
Replica replica = collectionState.getReplica(core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
if (replica.getState() != Replica.State.DOWN) {
for (CoreDescriptor coreDescriptor : cc.getCoreDescriptors()) {
if (coreDescriptor.getCloudDescriptor().getCollectionName().equals(collectionWithLocalReplica)) {
Replica replica = collectionState.getReplica(coreDescriptor.getCloudDescriptor().getCoreNodeName());
if (replica == null || replica.getState() != Replica.State.DOWN) {
foundStates = false;
}
}

View File

@ -20,11 +20,13 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.*;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
@ -34,11 +36,16 @@ import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.update.UpdateShardHandlerConfig;
import org.apache.solr.util.LogLevel;
import org.apache.zookeeper.CreateMode;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
@Slow
@SolrTestCaseJ4.SuppressSSL
public class ZkControllerTest extends SolrTestCaseJ4 {
@ -251,11 +258,26 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
}
}
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028, https://issues.apache.org/jira/browse/SOLR-7736")
@Slow
@LogLevel(value = "org.apache.solr.cloud=DEBUG;org.apache.solr.cloud.overseer=DEBUG")
public void testPublishAndWaitForDownStates() throws Exception {
String zkDir = createTempDir("testPublishAndWaitForDownStates").toFile().getAbsolutePath();
/*
This test asserts that if zkController.publishAndWaitForDownStates uses only core name to check if all local
cores are down then the method will return immediately but if it uses coreNodeName (as it does after SOLR-6665 then
the method will timeout).
We setup the cluster state in such a way that two replicas with same core name exist on non-existent nodes
and core container also has a dummy core that has the same core name. The publishAndWaitForDownStates before SOLR-6665
would only check the core names and therefore return immediately but after SOLR-6665 it should time out.
*/
assumeWorkingMockito();
final String collectionName = "testPublishAndWaitForDownStates";
String zkDir = createTempDir(collectionName).toFile().getAbsolutePath();
CoreContainer cc = null;
String nodeName = "127.0.0.1:8983_solr";
ZkTestServer server = new ZkTestServer(zkDir);
try {
server.run();
@ -263,7 +285,16 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
cc = getCoreContainer();
cc = new MockCoreContainer() {
@Override
public List<CoreDescriptor> getCoreDescriptors() {
CoreDescriptor descriptor = new CoreDescriptor(collectionName, TEST_PATH(), Collections.emptyMap(), new Properties(), true);
// non-existent coreNodeName, this will cause zkController.publishAndWaitForDownStates to wait indefinitely
// when using coreNodeName but usage of core name alone will return immediately
descriptor.getCloudDescriptor().setCoreNodeName("core_node0");
return Collections.singletonList(descriptor);
}
};
ZkController zkController = null;
try {
@ -277,28 +308,32 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
}
});
HashMap<String, DocCollection> collectionStates = new HashMap<>();
HashMap<String, Replica> replicas = new HashMap<>();
// add two replicas with the same core name but one of them should be on a different node
// than this ZkController instance
for (int i=1; i<=2; i++) {
Replica r = new Replica("core_node" + i,
map(ZkStateReader.STATE_PROP, i == 1 ? "active" : "down",
ZkStateReader.NODE_NAME_PROP, i == 1 ? "127.0.0.1:8983_solr" : "non_existent_host",
ZkStateReader.CORE_NAME_PROP, "collection1"));
replicas.put("core_node" + i, r);
}
HashMap<String, Object> sliceProps = new HashMap<>();
sliceProps.put("state", Slice.State.ACTIVE.toString());
Slice slice = new Slice("shard1", replicas, sliceProps);
DocCollection c = new DocCollection("testPublishAndWaitForDownStates", map("shard1", slice), Collections.emptyMap(), DocRouter.DEFAULT);
ClusterState state = new ClusterState(0, Collections.emptySet(), map("testPublishAndWaitForDownStates", c));
byte[] bytes = Utils.toJSON(state);
zkController.getZkClient().makePath(ZkStateReader.getCollectionPath("testPublishAndWaitForDownStates"), bytes, CreateMode.PERSISTENT, true);
zkController.getZkClient().makePath(ZkStateReader.getCollectionPathRoot(collectionName), new byte[0], CreateMode.PERSISTENT, true);
zkController.getZkStateReader().forceUpdateCollection("testPublishAndWaitForDownStates");
assertTrue(zkController.getZkStateReader().getClusterState().hasCollection("testPublishAndWaitForDownStates"));
assertNotNull(zkController.getZkStateReader().getClusterState().getCollection("testPublishAndWaitForDownStates"));
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
CollectionParams.CollectionAction.CREATE.toLower(), ZkStateReader.NODE_NAME_PROP, nodeName, ZkStateReader.NUM_SHARDS_PROP, "1",
"name", collectionName, DocCollection.STATE_FORMAT, "2");
zkController.getOverseerJobQueue().offer(Utils.toJSON(m));
HashMap<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
propMap.put(COLLECTION_PROP, collectionName);
propMap.put(SHARD_ID_PROP, "shard1");
propMap.put(ZkStateReader.NODE_NAME_PROP, "non_existent_host1");
propMap.put(ZkStateReader.CORE_NAME_PROP, collectionName);
propMap.put(ZkStateReader.STATE_PROP, "active");
zkController.getOverseerJobQueue().offer(Utils.toJSON(propMap));
propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
propMap.put(COLLECTION_PROP, collectionName);
propMap.put(SHARD_ID_PROP, "shard1");
propMap.put(ZkStateReader.NODE_NAME_PROP, "non_existent_host2");
propMap.put(ZkStateReader.CORE_NAME_PROP, collectionName);
propMap.put(ZkStateReader.STATE_PROP, "down");
zkController.getOverseerJobQueue().offer(Utils.toJSON(propMap));
zkController.getZkStateReader().forciblyRefreshAllClusterStateSlow();
long now = System.nanoTime();
long timeout = now + TimeUnit.NANOSECONDS.convert(ZkController.WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS);