SOLR-4421,SOLR-4165: On CoreContainer shutdown, all SolrCores should publish their state as DOWN.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1446914 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-02-16 16:59:53 +00:00
parent 3a4103d19b
commit 384d42b5e3
4 changed files with 90 additions and 89 deletions

View File

@ -129,6 +129,9 @@ Bug Fixes
kick in when using NRTCachingDirectory or the rate limiting feature.
(Mark Miller)
* SOLR-4421,SOLR-4165: On CoreContainer shutdown, all SolrCores should publish their
state as DOWN. (Mark Miller, Markus Jelsma)
Optimizations
----------------------
@ -161,9 +164,6 @@ Other Changes
* SOLR-4384: Make post.jar report timing information (Upayavira via janhoy)
* SOLR-4421: On CoreContainer shutdown, all SolrCores should publish their
state as DOWN. (Mark Miller)
================== 4.1.0 ==================
Versions of Major Components

View File

@ -480,9 +480,14 @@ public final class ZkController {
}
private void init(CurrentCoreDescriptorProvider registerOnReconnect) {
boolean alreadyCreatedZkReader = false;
try {
alreadyCreatedZkReader = publishAndWaitForDownStates(alreadyCreatedZkReader);
boolean createdWatchesAndUpdated = false;
if (zkClient.exists(ZkStateReader.LIVE_NODES_ZKNODE, true)) {
zkStateReader.createClusterStateWatchersAndUpdate();
createdWatchesAndUpdated = true;
publishAndWaitForDownStates();
}
// makes nodes zkNode
cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
@ -501,7 +506,7 @@ public final class ZkController {
overseerElector.setup(context);
overseerElector.joinElection(context, false);
if (!alreadyCreatedZkReader) {
if (!createdWatchesAndUpdated) {
zkStateReader.createClusterStateWatchersAndUpdate();
}
@ -523,93 +528,92 @@ public final class ZkController {
}
private boolean publishAndWaitForDownStates(boolean alreadyCreatedZkReader)
throws KeeperException, InterruptedException {
if (zkClient.exists(ZkStateReader.LIVE_NODES_ZKNODE, true)) {
alreadyCreatedZkReader = true;
// try and publish anyone from our node as down
zkStateReader.createClusterStateWatchersAndUpdate();
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> collections = clusterState.getCollections();
List<String> updatedNodes = new ArrayList<String>();
public void publishAndWaitForDownStates() throws KeeperException,
InterruptedException {
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> collections = clusterState.getCollections();
List<String> updatedNodes = new ArrayList<String>();
for (String collectionName : collections) {
DocCollection collection = clusterState.getCollection(collectionName);
Collection<Slice> slices = collection.getSlices();
for (Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
if (replica.getNodeName().equals(getNodeName())
&& !(replica.getStr(ZkStateReader.STATE_PROP)
.equals(ZkStateReader.DOWN))) {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
ZkStateReader.BASE_URL_PROP, getBaseUrl(),
ZkStateReader.CORE_NAME_PROP,
replica.getStr(ZkStateReader.CORE_NAME_PROP),
ZkStateReader.ROLES_PROP,
replica.getStr(ZkStateReader.ROLES_PROP),
ZkStateReader.NODE_NAME_PROP, getNodeName(),
ZkStateReader.SHARD_ID_PROP,
replica.getStr(ZkStateReader.SHARD_ID_PROP),
ZkStateReader.COLLECTION_PROP,
replica.getStr(ZkStateReader.COLLECTION_PROP));
updatedNodes.add(replica.getStr(ZkStateReader.CORE_NAME_PROP));
overseerJobQueue.offer(ZkStateReader.toJSON(m));
}
}
}
}
// now wait till the updates are in our state
long now = System.currentTimeMillis();
long timeout = now + 1000 * 300;
boolean foundStates = false;
while (System.currentTimeMillis() < timeout) {
clusterState = zkStateReader.getClusterState();
collections = clusterState.getCollections();
for (String collectionName : collections) {
DocCollection collection = clusterState.getCollection(collectionName);
Collection<Slice> slices = collection.getSlices();
for (Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
if (replica.getNodeName().equals(getNodeName())
&& !(replica.getStr(ZkStateReader.STATE_PROP)
.equals(ZkStateReader.DOWN))) {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
"state", ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
ZkStateReader.BASE_URL_PROP, getBaseUrl(),
ZkStateReader.CORE_NAME_PROP, replica.getStr(ZkStateReader.CORE_NAME_PROP),
ZkStateReader.ROLES_PROP,
replica.getStr(ZkStateReader.ROLES_PROP),
ZkStateReader.NODE_NAME_PROP, getNodeName(),
ZkStateReader.SHARD_ID_PROP,
replica.getStr(ZkStateReader.SHARD_ID_PROP),
ZkStateReader.COLLECTION_PROP,
replica.getStr(ZkStateReader.COLLECTION_PROP));
updatedNodes.add(replica.getStr(ZkStateReader.CORE_NAME_PROP));
overseerJobQueue.offer(ZkStateReader.toJSON(m));
if (replica.getStr(ZkStateReader.STATE_PROP).equals(
ZkStateReader.DOWN)) {
updatedNodes.remove(replica.getStr(ZkStateReader.CORE_NAME_PROP));
}
}
}
}
// now wait till the updates are in our state
long now = System.currentTimeMillis();
long timeout = now + 1000 * 300;
boolean foundStates = false;
while (System.currentTimeMillis() < timeout) {
clusterState = zkStateReader.getClusterState();
collections = clusterState.getCollections();
for (String collectionName : collections) {
DocCollection collection = clusterState
.getCollection(collectionName);
Collection<Slice> slices = collection.getSlices();
for (Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
if (replica.getStr(ZkStateReader.STATE_PROP).equals(
ZkStateReader.DOWN)) {
updatedNodes.remove(replica
.getStr(ZkStateReader.CORE_NAME_PROP));
}
}
}
}
if (updatedNodes.size() == 0) {
foundStates = true;
break;
}
}
if (!foundStates) {
log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
if (updatedNodes.size() == 0) {
foundStates = true;
break;
}
}
return alreadyCreatedZkReader;
if (!foundStates) {
log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
}
}
/**
* Validates if the chroot exists in zk (or if it is successfully created). Optionally, if create is set to true this method will create the path
* in case it doesn't exist
* @return true if the path exists or is created
* false if the path doesn't exist and 'create' = false
* Validates if the chroot exists in zk (or if it is successfully created).
* Optionally, if create is set to true this method will create the path in
* case it doesn't exist
*
* @return true if the path exists or is created false if the path doesn't
* exist and 'create' = false
*/
public static boolean checkChrootPath(String zkHost, boolean create) throws KeeperException, InterruptedException {
if(!containsChroot(zkHost)) {
public static boolean checkChrootPath(String zkHost, boolean create)
throws KeeperException, InterruptedException {
if (!containsChroot(zkHost)) {
return true;
}
log.info("zkHost includes chroot");
String chrootPath = zkHost.substring(zkHost.indexOf("/"), zkHost.length());
SolrZkClient tmpClient = new SolrZkClient(zkHost.substring(0, zkHost.indexOf("/")), 60*1000);
SolrZkClient tmpClient = new SolrZkClient(zkHost.substring(0,
zkHost.indexOf("/")), 60 * 1000);
boolean exists = tmpClient.exists(chrootPath, true);
if(!exists && create) {
if (!exists && create) {
tmpClient.makePath(chrootPath, false, true);
exists = true;
}
@ -617,7 +621,6 @@ public final class ZkController {
return exists;
}
/**
* Validates if zkHost contains a chroot. See http://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#ch_zkSessions
*/

View File

@ -740,12 +740,24 @@ public class CoreContainer
public void shutdown() {
log.info("Shutting down CoreContainer instance="
+ System.identityHashCode(this));
if (isZooKeeperAware()) {
try {
zkController.publishAndWaitForDownStates();
} catch (KeeperException e) {
log.error("", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("", e);
}
}
isShutDown = true;
if (isZooKeeperAware()) {
publishCoresAsDown();
cancelCoreRecoveries();
}
try {
synchronized (cores) {
@ -784,20 +796,6 @@ public class CoreContainer
}
}
private void publishCoresAsDown() {
synchronized (cores) {
for (SolrCore core : cores.values()) {
try {
zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
} catch (KeeperException e) {
log.error("", e);
} catch (InterruptedException e) {
log.error("", e);
}
}
}
}
public void cancelCoreRecoveries() {
ArrayList<SolrCoreState> coreStates = new ArrayList<SolrCoreState>();
synchronized (cores) {

View File

@ -207,7 +207,6 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
if (DEBUG) {
printLayout();
}
zkServer.shutdown();
System.clearProperty("zkHost");
System.clearProperty("collection");
System.clearProperty("enable.update.log");
@ -217,6 +216,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
System.clearProperty("solr.test.sys.prop2");
resetExceptionIgnores();
super.tearDown();
zkServer.shutdown();
}
protected void printLayout() throws Exception {