diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 1a5b7bca98d..09aec08ae9f 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -178,6 +178,9 @@ Bug Fixes * SOLR-5327: SOLR-4915, "The root cause should be returned to the user when a SolrCore create call fails", was reverted. (Mark Miller) +* SOLR-5325: ZooKeeper connection loss can cause the Overseer to stop processing + commands. (Christine Poerschke, Mark Miller, Jessica Cheng) + ================== 4.5.0 ================== Versions of Major Components diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 1a9a3ab82c4..695cd7b2861 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -56,9 +56,10 @@ public class Overseer { private static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates - private static Logger log = LoggerFactory.getLogger(Overseer.class); + static enum LeaderStatus { DONT_KNOW, NO, YES }; + private class ClusterStateUpdater implements Runnable, ClosableThread { private final ZkStateReader reader; @@ -82,7 +83,12 @@ public class Overseer { @Override public void run() { - if (!this.isClosed && amILeader()) { + LeaderStatus isLeader = amILeader(); + while (isLeader == LeaderStatus.DONT_KNOW) { + log.debug("am_i_leader unclear {}", isLeader); + isLeader = amILeader(); // not a no, not a yes, try ask again + } + if (!this.isClosed && LeaderStatus.YES == isLeader) { // see if there's something left from the previous Overseer and re // process all events that were not persisted into cloud state synchronized (reader.getUpdateLock()) { // XXX this only protects @@ -96,14 +102,24 @@ public class Overseer { ClusterState clusterState = reader.getClusterState(); log.info("Replaying operations from work queue."); - while (head != null && amILeader()) { - final ZkNodeProps message = ZkNodeProps.load(head); - final String operation = message.getStr(QUEUE_OPERATION); - clusterState = processMessage(clusterState, message, operation); - zkClient.setData(ZkStateReader.CLUSTER_STATE, - ZkStateReader.toJSON(clusterState), true); - - workQueue.poll(); + while (head != null) { + isLeader = amILeader(); + if (LeaderStatus.NO == isLeader) { + break; + } + else if (LeaderStatus.YES == isLeader) { + final ZkNodeProps message = ZkNodeProps.load(head); + final String operation = message.getStr(QUEUE_OPERATION); + clusterState = processMessage(clusterState, message, operation); + zkClient.setData(ZkStateReader.CLUSTER_STATE, + ZkStateReader.toJSON(clusterState), true); + + workQueue.poll(); // poll-ing removes the element we got by peek-ing + } + else { + log.info("am_i_leader unclear {}", isLeader); + // re-peek below in case our 'head' value is out-of-date by now + } head = workQueue.peek(); } @@ -126,7 +142,15 @@ public class Overseer { } log.info("Starting to work on the main queue"); - while (!this.isClosed && amILeader()) { + while (!this.isClosed) { + isLeader = amILeader(); + if (LeaderStatus.NO == isLeader) { + break; + } + else if (LeaderStatus.YES != isLeader) { + log.debug("am_i_leader unclear {}", isLeader); + continue; // not a no, not a yes, try ask again + } synchronized (reader.getUpdateLock()) { try { byte[] head = stateUpdateQueue.peek(); @@ -275,20 +299,28 @@ public class Overseer { return clusterState; } - private boolean amILeader() { - try { - ZkNodeProps props = ZkNodeProps.load(zkClient.getData("/overseer_elect/leader", null, null, true)); - if(myId.equals(props.getStr("id"))) { - return true; - } - } catch (KeeperException e) { - log.warn("", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + private LeaderStatus amILeader() { + try { + ZkNodeProps props = ZkNodeProps.load(zkClient.getData( + "/overseer_elect/leader", null, null, true)); + if (myId.equals(props.getStr("id"))) { + return LeaderStatus.YES; } - log.info("According to ZK I (id=" + myId + ") am no longer a leader."); - return false; + } catch (KeeperException e) { + if (e.code() == KeeperException.Code.CONNECTIONLOSS) { + log.error("", e); + return LeaderStatus.DONT_KNOW; + } else if (e.code() == KeeperException.Code.SESSIONEXPIRED) { + log.info("", e); + } else { + log.warn("", e); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } + log.info("According to ZK I (id=" + myId + ") am no longer a leader."); + return LeaderStatus.NO; + } /** * Try to assign core to the cluster. diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index bb2f452cf78..ffffd02188e 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -25,6 +25,7 @@ import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.UpdateResponse; import org.apache.solr.cloud.DistributedQueue.QueueEvent; +import org.apache.solr.cloud.Overseer.LeaderStatus; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.Aliases; @@ -140,8 +141,22 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { @Override public void run() { log.info("Process current queue of collection creations"); - while (amILeader() && !isClosed) { + LeaderStatus isLeader = amILeader(); + while (isLeader == LeaderStatus.DONT_KNOW) { + log.debug("am_i_leader unclear {}", isLeader); + isLeader = amILeader(); // not a no, not a yes, try ask again + } + while (!this.isClosed) { try { + isLeader = amILeader(); + if (LeaderStatus.NO == isLeader) { + break; + } + else if (LeaderStatus.YES != isLeader) { + log.debug("am_i_leader unclear {}", isLeader); + continue; // not a no, not a yes, try asking again + } + QueueEvent head = workQueue.peek(true); final ZkNodeProps message = ZkNodeProps.load(head.getBytes()); log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString()); @@ -172,20 +187,27 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { isClosed = true; } - protected boolean amILeader() { + protected LeaderStatus amILeader() { try { ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData( "/overseer_elect/leader", null, null, true)); if (myId.equals(props.getStr("id"))) { - return true; + return LeaderStatus.YES; } } catch (KeeperException e) { - log.warn("", e); + if (e.code() == KeeperException.Code.CONNECTIONLOSS) { + log.error("", e); + return LeaderStatus.DONT_KNOW; + } else if (e.code() == KeeperException.Code.SESSIONEXPIRED) { + log.info("", e); + } else { + log.warn("", e); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } log.info("According to ZK I (id=" + myId + ") am no longer a leader."); - return false; + return LeaderStatus.NO; } diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java index 8558bc567a2..2889d9a89f9 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java @@ -23,10 +23,10 @@ import java.util.List; import org.apache.lucene.util.LuceneTestCase.BadApple; import org.apache.lucene.util.LuceneTestCase.Slow; - import org.apache.http.client.HttpClient; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServer; +import org.apache.solr.client.solrj.impl.CloudSolrServer; import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrServer; import org.apache.solr.client.solrj.impl.HttpClientUtil; import org.apache.solr.client.solrj.impl.HttpSolrServer; @@ -205,6 +205,29 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase if (VERBOSE) System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults() .getNumFound() + "\n\n"); + + // try and make a collection to make sure the overseer has survived the expiration and session loss + + // sometimes we restart zookeeper as well + if (random().nextBoolean()) { + zkServer.shutdown(); + zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort()); + zkServer.run(); + } + + CloudSolrServer client = createCloudClient("collection1"); + try { + createCollection(null, "testcollection", + 1, 1, 1, client, null, "conf1"); + + } finally { + client.shutdown(); + } + List numShardsNumReplicas = new ArrayList(2); + numShardsNumReplicas.add(1); + numShardsNumReplicas.add(1); + checkForCollection("testcollection",numShardsNumReplicas, null); + testsSuccesful = true; } finally { if (!testsSuccesful) { diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java index 19375248414..5ccd6225cf6 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.impl.CloudSolrServer; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.core.Diagnostics; import org.apache.solr.core.SolrCore; @@ -137,6 +138,28 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase { checkShardConsistency(true, true); if (VERBOSE) System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n"); + + // try and make a collection to make sure the overseer has survived the expiration and session loss + + // sometimes we restart zookeeper as well + if (random().nextBoolean()) { + zkServer.shutdown(); + zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort()); + zkServer.run(); + } + + CloudSolrServer client = createCloudClient("collection1"); + try { + createCollection(null, "testcollection", + 1, 1, 1, client, null, "conf1"); + + } finally { + client.shutdown(); + } + List numShardsNumReplicas = new ArrayList(2); + numShardsNumReplicas.add(1); + numShardsNumReplicas.add(1); + checkForCollection("testcollection",numShardsNumReplicas, null); } private void randomlyEnableAutoSoftCommit() { diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java index 90aedcad8ae..432baf4453e 100644 --- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java @@ -605,6 +605,19 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa } } + // sometimes we restart zookeeper + if (random().nextBoolean()) { + zkServer.shutdown(); + zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort()); + zkServer.run(); + } + + // sometimes we cause a connection loss - sometimes it will hit the overseer + if (random().nextBoolean()) { + JettySolrRunner jetty = jettys.get(random().nextInt(jettys.size())); + ChaosMonkey.causeConnectionLoss(jetty); + } + ZkStateReader zkStateReader = getCommonCloudSolrServer().getZkStateReader(); for (int j = 0; j < cnt; j++) { waitForRecoveriesToFinish("awholynewcollection_" + j, zkStateReader, false); diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java index cead773a920..6d1b30a7b3d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java @@ -21,6 +21,7 @@ import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.cloud.DistributedQueue.QueueEvent; +import org.apache.solr.cloud.Overseer.LeaderStatus; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkNodeProps; @@ -101,8 +102,8 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 { } @Override - protected boolean amILeader() { - return true; + protected LeaderStatus amILeader() { + return LeaderStatus.YES; } } diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java index a4178ca9216..9d33ea36006 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java @@ -153,11 +153,11 @@ public class ChaosMonkey { } } - private void causeConnectionLoss(JettySolrRunner jetty) { + public static void causeConnectionLoss(JettySolrRunner jetty) { causeConnectionLoss(jetty, ZkTestServer.TICK_TIME * 2 + 200); } - private void causeConnectionLoss(JettySolrRunner jetty, int pauseTime) { + public static void causeConnectionLoss(JettySolrRunner jetty, int pauseTime) { SolrDispatchFilter solrDispatchFilter = (SolrDispatchFilter) jetty .getDispatchFilter().getFilter(); if (solrDispatchFilter != null) { diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java index 3729830ce26..776945633a6 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java @@ -350,4 +350,8 @@ public class ZkTestServer { public void setTheTickTime(int theTickTime) { this.theTickTime = theTickTime; } + + public String getZkDir() { + return zkDir; + } }