SOLR-5325: ZooKeeper connection loss can cause the Overseer to stop processing commands.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1531313 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-10-11 14:34:49 +00:00
parent 2da0337641
commit 0c6a64493c
9 changed files with 154 additions and 33 deletions

View File

@ -178,6 +178,9 @@ Bug Fixes
* SOLR-5327: SOLR-4915, "The root cause should be returned to the user when a SolrCore create * SOLR-5327: SOLR-4915, "The root cause should be returned to the user when a SolrCore create
call fails", was reverted. (Mark Miller) call fails", was reverted. (Mark Miller)
* SOLR-5325: ZooKeeper connection loss can cause the Overseer to stop processing
commands. (Christine Poerschke, Mark Miller, Jessica Cheng)
================== 4.5.0 ================== ================== 4.5.0 ==================
Versions of Major Components Versions of Major Components

View File

@ -56,9 +56,10 @@ public class Overseer {
private static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates private static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates
private static Logger log = LoggerFactory.getLogger(Overseer.class); private static Logger log = LoggerFactory.getLogger(Overseer.class);
static enum LeaderStatus { DONT_KNOW, NO, YES };
private class ClusterStateUpdater implements Runnable, ClosableThread { private class ClusterStateUpdater implements Runnable, ClosableThread {
private final ZkStateReader reader; private final ZkStateReader reader;
@ -82,7 +83,12 @@ public class Overseer {
@Override @Override
public void run() { public void run() {
if (!this.isClosed && amILeader()) { LeaderStatus isLeader = amILeader();
while (isLeader == LeaderStatus.DONT_KNOW) {
log.debug("am_i_leader unclear {}", isLeader);
isLeader = amILeader(); // not a no, not a yes, try ask again
}
if (!this.isClosed && LeaderStatus.YES == isLeader) {
// see if there's something left from the previous Overseer and re // see if there's something left from the previous Overseer and re
// process all events that were not persisted into cloud state // process all events that were not persisted into cloud state
synchronized (reader.getUpdateLock()) { // XXX this only protects synchronized (reader.getUpdateLock()) { // XXX this only protects
@ -96,14 +102,24 @@ public class Overseer {
ClusterState clusterState = reader.getClusterState(); ClusterState clusterState = reader.getClusterState();
log.info("Replaying operations from work queue."); log.info("Replaying operations from work queue.");
while (head != null && amILeader()) { while (head != null) {
final ZkNodeProps message = ZkNodeProps.load(head); isLeader = amILeader();
final String operation = message.getStr(QUEUE_OPERATION); if (LeaderStatus.NO == isLeader) {
clusterState = processMessage(clusterState, message, operation); break;
zkClient.setData(ZkStateReader.CLUSTER_STATE, }
ZkStateReader.toJSON(clusterState), true); else if (LeaderStatus.YES == isLeader) {
final ZkNodeProps message = ZkNodeProps.load(head);
workQueue.poll(); final String operation = message.getStr(QUEUE_OPERATION);
clusterState = processMessage(clusterState, message, operation);
zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(clusterState), true);
workQueue.poll(); // poll-ing removes the element we got by peek-ing
}
else {
log.info("am_i_leader unclear {}", isLeader);
// re-peek below in case our 'head' value is out-of-date by now
}
head = workQueue.peek(); head = workQueue.peek();
} }
@ -126,7 +142,15 @@ public class Overseer {
} }
log.info("Starting to work on the main queue"); log.info("Starting to work on the main queue");
while (!this.isClosed && amILeader()) { while (!this.isClosed) {
isLeader = amILeader();
if (LeaderStatus.NO == isLeader) {
break;
}
else if (LeaderStatus.YES != isLeader) {
log.debug("am_i_leader unclear {}", isLeader);
continue; // not a no, not a yes, try ask again
}
synchronized (reader.getUpdateLock()) { synchronized (reader.getUpdateLock()) {
try { try {
byte[] head = stateUpdateQueue.peek(); byte[] head = stateUpdateQueue.peek();
@ -275,20 +299,28 @@ public class Overseer {
return clusterState; return clusterState;
} }
private boolean amILeader() { private LeaderStatus amILeader() {
try { try {
ZkNodeProps props = ZkNodeProps.load(zkClient.getData("/overseer_elect/leader", null, null, true)); ZkNodeProps props = ZkNodeProps.load(zkClient.getData(
if(myId.equals(props.getStr("id"))) { "/overseer_elect/leader", null, null, true));
return true; if (myId.equals(props.getStr("id"))) {
} return LeaderStatus.YES;
} catch (KeeperException e) {
log.warn("", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} }
log.info("According to ZK I (id=" + myId + ") am no longer a leader."); } catch (KeeperException e) {
return false; if (e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.error("", e);
return LeaderStatus.DONT_KNOW;
} else if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.info("", e);
} else {
log.warn("", e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} }
log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
return LeaderStatus.NO;
}
/** /**
* Try to assign core to the cluster. * Try to assign core to the cluster.

View File

@ -25,6 +25,7 @@ import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse; import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.DistributedQueue.QueueEvent; import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.Aliases;
@ -140,8 +141,22 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
@Override @Override
public void run() { public void run() {
log.info("Process current queue of collection creations"); log.info("Process current queue of collection creations");
while (amILeader() && !isClosed) { LeaderStatus isLeader = amILeader();
while (isLeader == LeaderStatus.DONT_KNOW) {
log.debug("am_i_leader unclear {}", isLeader);
isLeader = amILeader(); // not a no, not a yes, try ask again
}
while (!this.isClosed) {
try { try {
isLeader = amILeader();
if (LeaderStatus.NO == isLeader) {
break;
}
else if (LeaderStatus.YES != isLeader) {
log.debug("am_i_leader unclear {}", isLeader);
continue; // not a no, not a yes, try asking again
}
QueueEvent head = workQueue.peek(true); QueueEvent head = workQueue.peek(true);
final ZkNodeProps message = ZkNodeProps.load(head.getBytes()); final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString()); log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
@ -172,20 +187,27 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
isClosed = true; isClosed = true;
} }
protected boolean amILeader() { protected LeaderStatus amILeader() {
try { try {
ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData( ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(
"/overseer_elect/leader", null, null, true)); "/overseer_elect/leader", null, null, true));
if (myId.equals(props.getStr("id"))) { if (myId.equals(props.getStr("id"))) {
return true; return LeaderStatus.YES;
} }
} catch (KeeperException e) { } catch (KeeperException e) {
log.warn("", e); if (e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.error("", e);
return LeaderStatus.DONT_KNOW;
} else if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.info("", e);
} else {
log.warn("", e);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
log.info("According to ZK I (id=" + myId + ") am no longer a leader."); log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
return false; return LeaderStatus.NO;
} }

View File

@ -23,10 +23,10 @@ import java.util.List;
import org.apache.lucene.util.LuceneTestCase.BadApple; import org.apache.lucene.util.LuceneTestCase.BadApple;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrServer; import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrServer;
import org.apache.solr.client.solrj.impl.HttpClientUtil; import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.impl.HttpSolrServer;
@ -205,6 +205,29 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
if (VERBOSE) System.out.println("control docs:" if (VERBOSE) System.out.println("control docs:"
+ controlClient.query(new SolrQuery("*:*")).getResults() + controlClient.query(new SolrQuery("*:*")).getResults()
.getNumFound() + "\n\n"); .getNumFound() + "\n\n");
// try and make a collection to make sure the overseer has survived the expiration and session loss
// sometimes we restart zookeeper as well
if (random().nextBoolean()) {
zkServer.shutdown();
zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort());
zkServer.run();
}
CloudSolrServer client = createCloudClient("collection1");
try {
createCollection(null, "testcollection",
1, 1, 1, client, null, "conf1");
} finally {
client.shutdown();
}
List<Integer> numShardsNumReplicas = new ArrayList<Integer>(2);
numShardsNumReplicas.add(1);
numShardsNumReplicas.add(1);
checkForCollection("testcollection",numShardsNumReplicas, null);
testsSuccesful = true; testsSuccesful = true;
} finally { } finally {
if (!testsSuccesful) { if (!testsSuccesful) {

View File

@ -22,6 +22,7 @@ import java.util.List;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.core.Diagnostics; import org.apache.solr.core.Diagnostics;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
@ -137,6 +138,28 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
checkShardConsistency(true, true); checkShardConsistency(true, true);
if (VERBOSE) System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n"); if (VERBOSE) System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n");
// try and make a collection to make sure the overseer has survived the expiration and session loss
// sometimes we restart zookeeper as well
if (random().nextBoolean()) {
zkServer.shutdown();
zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort());
zkServer.run();
}
CloudSolrServer client = createCloudClient("collection1");
try {
createCollection(null, "testcollection",
1, 1, 1, client, null, "conf1");
} finally {
client.shutdown();
}
List<Integer> numShardsNumReplicas = new ArrayList<Integer>(2);
numShardsNumReplicas.add(1);
numShardsNumReplicas.add(1);
checkForCollection("testcollection",numShardsNumReplicas, null);
} }
private void randomlyEnableAutoSoftCommit() { private void randomlyEnableAutoSoftCommit() {

View File

@ -605,6 +605,19 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
} }
} }
// sometimes we restart zookeeper
if (random().nextBoolean()) {
zkServer.shutdown();
zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort());
zkServer.run();
}
// sometimes we cause a connection loss - sometimes it will hit the overseer
if (random().nextBoolean()) {
JettySolrRunner jetty = jettys.get(random().nextInt(jettys.size()));
ChaosMonkey.causeConnectionLoss(jetty);
}
ZkStateReader zkStateReader = getCommonCloudSolrServer().getZkStateReader(); ZkStateReader zkStateReader = getCommonCloudSolrServer().getZkStateReader();
for (int j = 0; j < cnt; j++) { for (int j = 0; j < cnt; j++) {
waitForRecoveriesToFinish("awholynewcollection_" + j, zkStateReader, false); waitForRecoveriesToFinish("awholynewcollection_" + j, zkStateReader, false);

View File

@ -21,6 +21,7 @@ import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.DistributedQueue.QueueEvent; import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
@ -101,8 +102,8 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
} }
@Override @Override
protected boolean amILeader() { protected LeaderStatus amILeader() {
return true; return LeaderStatus.YES;
} }
} }

View File

@ -153,11 +153,11 @@ public class ChaosMonkey {
} }
} }
private void causeConnectionLoss(JettySolrRunner jetty) { public static void causeConnectionLoss(JettySolrRunner jetty) {
causeConnectionLoss(jetty, ZkTestServer.TICK_TIME * 2 + 200); causeConnectionLoss(jetty, ZkTestServer.TICK_TIME * 2 + 200);
} }
private void causeConnectionLoss(JettySolrRunner jetty, int pauseTime) { public static void causeConnectionLoss(JettySolrRunner jetty, int pauseTime) {
SolrDispatchFilter solrDispatchFilter = (SolrDispatchFilter) jetty SolrDispatchFilter solrDispatchFilter = (SolrDispatchFilter) jetty
.getDispatchFilter().getFilter(); .getDispatchFilter().getFilter();
if (solrDispatchFilter != null) { if (solrDispatchFilter != null) {

View File

@ -350,4 +350,8 @@ public class ZkTestServer {
public void setTheTickTime(int theTickTime) { public void setTheTickTime(int theTickTime) {
this.theTickTime = theTickTime; this.theTickTime = theTickTime;
} }
public String getZkDir() {
return zkDir;
}
} }