diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 1dafa7d332c..3589fd8e9aa 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -159,7 +159,15 @@ public class Overseer implements Closeable { log.debug("processMessage: workQueueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message); // force flush to ZK after each message because there is no fallback if workQueue items // are removed from workQueue but fail to be written to ZK - clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null); + try { + clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null); + } catch (Exception e) { + if (isBadMessage(e)) { + log.warn("Exception when process message = {}, consider as bad message and poll out from the queue", message); + workQueue.poll(); + } + throw e; + } workQueue.poll(); // poll-ing removes the element we got by peek-ing data = workQueue.peek(); } @@ -167,33 +175,28 @@ public class Overseer implements Closeable { if (hadWorkItems) { clusterState = zkStateWriter.writePendingUpdates(); } - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED) { - log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e); - return; - } - log.error("Exception in Overseer work queue loop", e); + } catch (KeeperException.SessionExpiredException e) { + log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e); + return; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } catch (Exception e) { - log.error("Exception in Overseer work queue loop", e); + log.error("Exception in Overseer when process message from work queue, retrying", e); + refreshClusterState = true; + continue; } } byte[] head = null; try { head = stateUpdateQueue.peek(true); - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED) { - log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e); - return; - } - log.error("Exception in Overseer main queue loop", e); + } catch (KeeperException.SessionExpiredException e) { + log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e); + return; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; - } catch (Exception e) { log.error("Exception in Overseer main queue loop", e); } @@ -237,16 +240,9 @@ public class Overseer implements Closeable { // clean work queue while (workQueue.poll() != null); - } catch (KeeperException.BadVersionException bve) { - log.warn("Bad version writing to ZK using compare-and-set, will force refresh cluster state: {}", bve.getMessage()); - refreshClusterState = true; - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED) { - log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e); - return; - } - log.error("Exception in Overseer main queue loop", e); - refreshClusterState = true; // force refresh state in case of all errors + } catch (KeeperException.SessionExpiredException e) { + log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e); + return; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; @@ -262,6 +258,16 @@ public class Overseer implements Closeable { } } + // Return true whenever the exception thrown by ZkStateWriter is correspond + // to a invalid state or 'bad' message (in this case, we should remove that message from queue) + private boolean isBadMessage(Exception e) { + if (e instanceof KeeperException) { + KeeperException ke = (KeeperException) e; + return ke.code() == KeeperException.Code.NONODE || ke.code() == KeeperException.Code.NODEEXISTS; + } + return !(e instanceof InterruptedException); + } + private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception { final String operation = message.getStr(QUEUE_OPERATION); List zkWriteCommands = null; diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java index 030be51cdf2..62f97cbdacd 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java @@ -689,6 +689,63 @@ public class OverseerTest extends SolrTestCaseJ4 { } } } + + @Test + public void testExceptionWhenFlushClusterState() throws Exception { + String zkDir = createTempDir("zkData").toFile().getAbsolutePath(); + + ZkTestServer server = new ZkTestServer(zkDir); + + SolrZkClient controllerClient = null; + SolrZkClient overseerClient = null; + ZkStateReader reader = null; + + try { + server.run(); + controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT); + + AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); + AbstractZkTestCase.makeSolrZkNode(server.getZkHost()); + ZkController.createClusterZkNodes(controllerClient); + + reader = new ZkStateReader(controllerClient); + reader.createClusterStateWatchersAndUpdate(); + + // We did not create /collections -> this message will cause exception when Overseer try to flush the clusterstate + ZkNodeProps badMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), + "name", "collection1", + ZkStateReader.REPLICATION_FACTOR, "1", + ZkStateReader.NUM_SHARDS_PROP, "1", + DocCollection.STATE_FORMAT, "2", + "createNodeSet", ""); + ZkNodeProps goodMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), + "name", "collection2", + ZkStateReader.REPLICATION_FACTOR, "1", + ZkStateReader.NUM_SHARDS_PROP, "1", + DocCollection.STATE_FORMAT, "1", + "createNodeSet", ""); + ZkDistributedQueue workQueue = Overseer.getInternalWorkQueue(controllerClient, new Overseer.Stats()); + workQueue.offer(Utils.toJSON(badMessage)); + workQueue.offer(Utils.toJSON(goodMessage)); + overseerClient = electNewOverseer(server.getZkAddress()); + waitForCollections(reader, "collection2"); + + ZkDistributedQueue q = Overseer.getStateUpdateQueue(controllerClient); + q.offer(Utils.toJSON(badMessage)); + q.offer(Utils.toJSON(goodMessage.plus("name", "collection3"))); + waitForCollections(reader, "collection2", "collection3"); + assertNotNull(reader.getClusterState().getCollectionOrNull("collection2")); + assertNotNull(reader.getClusterState().getCollectionOrNull("collection3")); + + assertTrue(workQueue.peek() == null); + assertTrue(q.peek() == null); + } finally { + close(overseerClient); + close(controllerClient); + close(reader); + server.shutdown(); + } + } @Test public void testShardLeaderChange() throws Exception {