diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 04430677f41..b9b572cc005 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -116,6 +116,9 @@ Bug Fixes * SOLR-5761: HttpSolrServer has a few fields that can be set via setters but are not volatile. (Mark Miller, Gregory Chanan) +* SOLR-5811: The Overseer will retry work items until success, which is a serious + problem if you hit a bad work item. (Mark Miller) + Optimizations ---------------------- * SOLR-1880: Distributed Search skips GET_FIELDS stage if EXECUTE_QUERY diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 666c7134d00..8bf202ce8ec 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -123,7 +123,16 @@ public class Overseer { else if (LeaderStatus.YES == isLeader) { final ZkNodeProps message = ZkNodeProps.load(head); final String operation = message.getStr(QUEUE_OPERATION); - clusterState = processMessage(clusterState, message, operation); + try { + clusterState = processMessage(clusterState, message, operation); + } catch (Exception e) { + // generally there is nothing we can do - in most cases, we have + // an issue that will fail again on retry or we cannot communicate with + // ZooKeeper in which case another Overseer should take over + // TODO: if ordering for the message is not important, we could + // track retries and put it back on the end of the queue + log.error("Could not process Overseer message", e); + } zkClient.setData(ZkStateReader.CLUSTER_STATE, ZkStateReader.toJSON(clusterState), true); @@ -189,8 +198,16 @@ public class Overseer { while (head != null) { final ZkNodeProps message = ZkNodeProps.load(head.getBytes()); final String operation = message.getStr(QUEUE_OPERATION); - - clusterState = processMessage(clusterState, message, operation); + try { + clusterState = processMessage(clusterState, message, operation); + } catch (Exception e) { + // generally there is nothing we can do - in most cases, we have + // an issue that will fail again on retry or we cannot communicate with + // ZooKeeper in which case another Overseer should take over + // TODO: if ordering for the message is not important, we could + // track retries and put it back on the end of the queue + log.error("Could not process Overseer message", e); + } workQueue.offer(head.getBytes()); stateUpdateQueue.poll(); @@ -294,6 +311,7 @@ public class Overseer { private ClusterState createReplica(ClusterState clusterState, ZkNodeProps message) { log.info("createReplica() {} ", message); String coll = message.getStr(ZkStateReader.COLLECTION_PROP); + checkCollection(message, coll); String slice = message.getStr(ZkStateReader.SHARD_ID_PROP); Slice sl = clusterState.getSlice(coll, slice); if(sl == null){ @@ -334,6 +352,7 @@ public class Overseer { private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) { String collection = message.getStr(ZkStateReader.COLLECTION_PROP); + checkCollection(message, collection); log.info("Update shard state invoked for collection: " + collection + " with message: " + message); for (String key : message.keySet()) { if (ZkStateReader.COLLECTION_PROP.equals(key)) continue; @@ -358,6 +377,7 @@ public class Overseer { private ClusterState addRoutingRule(ClusterState clusterState, ZkNodeProps message) { String collection = message.getStr(ZkStateReader.COLLECTION_PROP); + checkCollection(message, collection); String shard = message.getStr(ZkStateReader.SHARD_ID_PROP); String routeKey = message.getStr("routeKey"); String range = message.getStr("range"); @@ -397,8 +417,15 @@ public class Overseer { return clusterState; } + private void checkCollection(ZkNodeProps message, String collection) { + if (collection == null || collection.trim().length() == 0) { + log.error("Skipping invalid Overseer message because it has no collection specified: " + message); + } + } + private ClusterState removeRoutingRule(ClusterState clusterState, ZkNodeProps message) { String collection = message.getStr(ZkStateReader.COLLECTION_PROP); + checkCollection(message, collection); String shard = message.getStr(ZkStateReader.SHARD_ID_PROP); String routeKeyStr = message.getStr("routeKey"); @@ -424,6 +451,7 @@ public class Overseer { private ClusterState createShard(ClusterState clusterState, ZkNodeProps message) { String collection = message.getStr(ZkStateReader.COLLECTION_PROP); + checkCollection(message, collection); String shardId = message.getStr(ZkStateReader.SHARD_ID_PROP); Slice slice = clusterState.getSlice(collection, shardId); if (slice == null) { @@ -470,6 +498,7 @@ public class Overseer { private ClusterState updateStateNew(ClusterState clusterState, ZkNodeProps message) { String collection = message.getStr(ZkStateReader.COLLECTION_PROP); + checkCollection(message, collection); String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP); if(collection==null || sliceName == null){ @@ -490,9 +519,7 @@ public class Overseer { */ private ClusterState updateState(ClusterState state, final ZkNodeProps message) { final String collection = message.getStr(ZkStateReader.COLLECTION_PROP); - assert collection.length() > 0 : message; - - + checkCollection(message, collection); Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null); log.info("Update state numShards={} message={}", numShards, message); @@ -851,9 +878,7 @@ public class Overseer { private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) { final String collection = message.getStr("name"); - -// final Map newCollections = new LinkedHashMap(clusterState.getCollectionStates()); // shallow copy -// newCollections.remove(collection); + checkCollection(message, collection); // ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections); return clusterState.copyWith(singletonMap(collection, (DocCollection)null)); @@ -864,6 +889,7 @@ public class Overseer { */ private ClusterState removeShard(final ClusterState clusterState, ZkNodeProps message) { final String collection = message.getStr(ZkStateReader.COLLECTION_PROP); + checkCollection(message, collection); final String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP); log.info("Removing collection: " + collection + " shard: " + sliceId + " from clusterstate"); @@ -889,6 +915,7 @@ public class Overseer { String cnn = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP); final String collection = message.getStr(ZkStateReader.COLLECTION_PROP); + checkCollection(message, collection); // final Map newCollections = new LinkedHashMap(clusterState.getCollectionStates()); // shallow copy // DocCollection coll = newCollections.get(collection); diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 27dd8c196e9..ac36d4c5e2f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1064,6 +1064,12 @@ public final class ZkController { final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); final String collection = cd.getCloudDescriptor().getCollectionName(); assert collection != null; + + if (collection == null || collection.trim().length() == 0) { + log.error("No collection was specified."); + return; + } + ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName)); if (context != null) { diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java index f63b1f9df99..a67a8be527a 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java @@ -64,18 +64,18 @@ public class OverseerTest extends SolrTestCaseJ4 { private List overseers = new ArrayList(); private List readers = new ArrayList(); + private String collection = "collection1"; + public static class MockZKController{ private final SolrZkClient zkClient; private final ZkStateReader zkStateReader; private final String nodeName; - private final String collection; private final LeaderElector elector; private final Map electionContext = Collections.synchronizedMap(new HashMap()); - public MockZKController(String zkAddress, String nodeName, String collection) throws InterruptedException, TimeoutException, IOException, KeeperException { + public MockZKController(String zkAddress, String nodeName) throws InterruptedException, TimeoutException, IOException, KeeperException { this.nodeName = nodeName; - this.collection = collection; zkClient = new SolrZkClient(zkAddress, TIMEOUT); zkStateReader = new ZkStateReader(zkClient); zkStateReader.createClusterStateWatchersAndUpdate(); @@ -105,7 +105,7 @@ public class OverseerTest extends SolrTestCaseJ4 { zkClient.close(); } - public String publishState(String coreName, String coreNodeName, String stateName, int numShards) + public String publishState(String collection, String coreName, String coreNodeName, String stateName, int numShards) throws KeeperException, InterruptedException, IOException { if (stateName == null) { ElectionContext ec = electionContext.remove(coreName); @@ -134,41 +134,40 @@ public class OverseerTest extends SolrTestCaseJ4 { q.offer(ZkStateReader.toJSON(m)); } - for (int i = 0; i < 120; i++) { - String shardId = getShardId("http://" + nodeName + "/solr/", coreName); - if (shardId != null) { - try { - zkClient.makePath("/collections/" + collection + "/leader_elect/" - + shardId + "/election", true); - } catch (NodeExistsException nee) {} - ZkNodeProps props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, - "http://" + nodeName + "/solr/", ZkStateReader.NODE_NAME_PROP, - nodeName, ZkStateReader.CORE_NAME_PROP, coreName, - ZkStateReader.SHARD_ID_PROP, shardId, - ZkStateReader.COLLECTION_PROP, collection, - ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName); - ShardLeaderElectionContextBase ctx = new ShardLeaderElectionContextBase( - elector, shardId, collection, nodeName + "_" + coreName, props, - zkStateReader); - elector.setup(ctx); - elector.joinElection(ctx, false); - return shardId; + if (collection.length() > 0) { + for (int i = 0; i < 120; i++) { + String shardId = getShardId(collection, coreNodeName); + if (shardId != null) { + try { + zkClient.makePath("/collections/" + collection + "/leader_elect/" + + shardId + "/election", true); + } catch (NodeExistsException nee) {} + ZkNodeProps props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, + "http://" + nodeName + "/solr/", ZkStateReader.NODE_NAME_PROP, + nodeName, ZkStateReader.CORE_NAME_PROP, coreName, + ZkStateReader.SHARD_ID_PROP, shardId, + ZkStateReader.COLLECTION_PROP, collection, + ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName); + ShardLeaderElectionContextBase ctx = new ShardLeaderElectionContextBase( + elector, shardId, collection, nodeName + "_" + coreName, props, + zkStateReader); + elector.setup(ctx); + elector.joinElection(ctx, false); + return shardId; + } + Thread.sleep(500); } - Thread.sleep(500); } return null; } - private String getShardId(final String baseUrl, final String coreName) { - Map slices = zkStateReader.getClusterState().getSlicesMap( - collection); + private String getShardId(String collection, String coreNodeName) { + Map slices = zkStateReader.getClusterState().getSlicesMap(collection); if (slices != null) { for (Slice slice : slices.values()) { for (Replica replica : slice.getReplicas()) { - // TODO: for really large clusters, we could 'index' on this - String rbaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP); - String rcore = replica.getStr(ZkStateReader.CORE_NAME_PROP); - if (baseUrl.equals(rbaseUrl) && coreName.equals(rcore)) { + String cnn = replica.getName(); + if (coreNodeName.equals(cnn)) { return slice.getName(); } } @@ -226,17 +225,17 @@ public class OverseerTest extends SolrTestCaseJ4 { ZkStateReader reader = new ZkStateReader(zkClient); reader.createClusterStateWatchersAndUpdate(); - zkController = new MockZKController(server.getZkAddress(), "127.0.0.1", "collection1"); + zkController = new MockZKController(server.getZkAddress(), "127.0.0.1"); final int numShards=6; for (int i = 0; i < numShards; i++) { - assertNotNull("shard got no id?", zkController.publishState("core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3)); + assertNotNull("shard got no id?", zkController.publishState(collection, "core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3)); } - - assertEquals(2, reader.getClusterState().getSlice("collection1", "shard1").getReplicasMap().size()); - assertEquals(2, reader.getClusterState().getSlice("collection1", "shard2").getReplicasMap().size()); - assertEquals(2, reader.getClusterState().getSlice("collection1", "shard3").getReplicasMap().size()); + Map rmap = reader.getClusterState().getSlice("collection1", "shard1").getReplicasMap(); + assertEquals(rmap.toString(), 2, rmap.size()); + assertEquals(rmap.toString(), 2, reader.getClusterState().getSlice("collection1", "shard2").getReplicasMap().size()); + assertEquals(rmap.toString(), 2, reader.getClusterState().getSlice("collection1", "shard3").getReplicasMap().size()); //make sure leaders are in cloud state assertNotNull(reader.getLeaderUrl("collection1", "shard1", 15000)); @@ -258,6 +257,81 @@ public class OverseerTest extends SolrTestCaseJ4 { } } + @Test + public void testBadQueueItem() throws Exception { + String zkDir = dataDir.getAbsolutePath() + File.separator + + "zookeeper/server1/data"; + + ZkTestServer server = new ZkTestServer(zkDir); + + MockZKController zkController = null; + SolrZkClient zkClient = null; + SolrZkClient overseerClient = null; + + try { + server.run(); + AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); + AbstractZkTestCase.makeSolrZkNode(server.getZkHost()); + + zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT); + zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true); + + overseerClient = electNewOverseer(server.getZkAddress()); + + ZkStateReader reader = new ZkStateReader(zkClient); + reader.createClusterStateWatchersAndUpdate(); + + zkController = new MockZKController(server.getZkAddress(), "127.0.0.1"); + + final int numShards=3; + + for (int i = 0; i < numShards; i++) { + assertNotNull("shard got no id?", zkController.publishState(collection, "core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3)); + } + + assertEquals(1, reader.getClusterState().getSlice(collection, "shard1").getReplicasMap().size()); + assertEquals(1, reader.getClusterState().getSlice(collection, "shard2").getReplicasMap().size()); + assertEquals(1, reader.getClusterState().getSlice(collection, "shard3").getReplicasMap().size()); + + //make sure leaders are in cloud state + assertNotNull(reader.getLeaderUrl(collection, "shard1", 15000)); + assertNotNull(reader.getLeaderUrl(collection, "shard2", 15000)); + assertNotNull(reader.getLeaderUrl(collection, "shard3", 15000)); + + // publish a bad queue item + String emptyCollectionName = ""; + zkController.publishState(emptyCollectionName, "core0", "node0", ZkStateReader.ACTIVE, 1); + zkController.publishState(emptyCollectionName, "core0", "node0", null, 1); + + // make sure the Overseer is still processing items + for (int i = 0; i < numShards; i++) { + assertNotNull("shard got no id?", zkController.publishState("collection2", "core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3)); + } + + assertEquals(1, reader.getClusterState().getSlice("collection2", "shard1").getReplicasMap().size()); + assertEquals(1, reader.getClusterState().getSlice("collection2", "shard2").getReplicasMap().size()); + assertEquals(1, reader.getClusterState().getSlice("collection2", "shard3").getReplicasMap().size()); + + //make sure leaders are in cloud state + assertNotNull(reader.getLeaderUrl("collection2", "shard1", 15000)); + assertNotNull(reader.getLeaderUrl("collection2", "shard2", 15000)); + assertNotNull(reader.getLeaderUrl("collection2", "shard3", 15000)); + + } finally { + if (DEBUG) { + if (zkController != null) { + zkClient.printLayoutToStdOut(); + } + } + close(zkClient); + if (zkController != null) { + zkController.close(); + } + close(overseerClient); + server.shutdown(); + } + } + @Test public void testShardAssignmentBigger() throws Exception { String zkDir = dataDir.getAbsolutePath() + File.separator @@ -289,7 +363,7 @@ public class OverseerTest extends SolrTestCaseJ4 { reader.createClusterStateWatchersAndUpdate(); for (int i = 0; i < nodeCount; i++) { - controllers[i] = new MockZKController(server.getZkAddress(), "node" + i, "collection1"); + controllers[i] = new MockZKController(server.getZkAddress(), "node" + i); } for (int i = 0; i < nodeCount; i++) { nodeExecutors[i] = Executors.newFixedThreadPool(1, new DefaultSolrThreadFactory("testShardAssignment")); @@ -306,7 +380,7 @@ public class OverseerTest extends SolrTestCaseJ4 { final String coreName = "core" + slot; try { - ids[slot]=controllers[slot % nodeCount].publishState(coreName, "node" + slot, ZkStateReader.ACTIVE, sliceCount); + ids[slot]=controllers[slot % nodeCount].publishState(collection, coreName, "node" + slot, ZkStateReader.ACTIVE, sliceCount); } catch (Throwable e) { e.printStackTrace(); fail("register threw exception:" + e.getClass()); @@ -551,21 +625,20 @@ public class OverseerTest extends SolrTestCaseJ4 { reader = new ZkStateReader(zkClient); reader.createClusterStateWatchersAndUpdate(); - mockController = new MockZKController(server.getZkAddress(), "node1", - "collection1"); + mockController = new MockZKController(server.getZkAddress(), "node1"); overseerClient = electNewOverseer(server.getZkAddress()); Thread.sleep(1000); - mockController.publishState("core1", "core_node1", + mockController.publishState(collection, "core1", "core_node1", ZkStateReader.RECOVERING, 1); - waitForCollections(reader, "collection1"); + waitForCollections(reader, collection); verifyStatus(reader, ZkStateReader.RECOVERING); int version = getClusterStateVersion(zkClient); - mockController.publishState("core1", "core_node1", ZkStateReader.ACTIVE, + mockController.publishState(collection, "core1", "core_node1", ZkStateReader.ACTIVE, 1); while (version == getClusterStateVersion(zkClient)); @@ -575,7 +648,7 @@ public class OverseerTest extends SolrTestCaseJ4 { overseerClient.close(); Thread.sleep(1000); // wait for overseer to get killed - mockController.publishState("core1", "core_node1", + mockController.publishState(collection, "core1", "core_node1", ZkStateReader.RECOVERING, 1); version = getClusterStateVersion(zkClient); @@ -588,13 +661,13 @@ public class OverseerTest extends SolrTestCaseJ4 { assertEquals("Live nodes count does not match", 1, reader .getClusterState().getLiveNodes().size()); assertEquals("Shard count does not match", 1, reader.getClusterState() - .getSlice("collection1", "shard1").getReplicasMap().size()); + .getSlice(collection, "shard1").getReplicasMap().size()); version = getClusterStateVersion(zkClient); - mockController.publishState("core1", "core_node1", null, 1); + mockController.publishState(collection, "core1", "core_node1", null, 1); while (version == getClusterStateVersion(zkClient)); Thread.sleep(500); assertFalse("collection1 should be gone after publishing the null state", - reader.getClusterState().getCollections().contains("collection1")); + reader.getClusterState().getCollections().contains(collection)); } finally { close(mockController); close(overseerClient); @@ -676,17 +749,17 @@ public class OverseerTest extends SolrTestCaseJ4 { for (int i = 0; i < atLeast(4); i++) { killCounter.incrementAndGet(); //for each round allow 1 kill - mockController = new MockZKController(server.getZkAddress(), "node1", "collection1"); - mockController.publishState("core1", "node1", "state1",1); + mockController = new MockZKController(server.getZkAddress(), "node1"); + mockController.publishState(collection, "core1", "node1", "state1",1); if(mockController2!=null) { mockController2.close(); mockController2 = null; } - mockController.publishState("core1", "node1","state2",1); - mockController2 = new MockZKController(server.getZkAddress(), "node2", "collection1"); - mockController.publishState("core1", "node1", "state1",1); + mockController.publishState(collection, "core1", "node1","state2",1); + mockController2 = new MockZKController(server.getZkAddress(), "node2"); + mockController.publishState(collection, "core1", "node1", "state1",1); verifyShardLeader(reader, "collection1", "shard1", "core1"); - mockController2.publishState("core4", "node2", "state2" ,1); + mockController2.publishState(collection, "core4", "node2", "state2" ,1); mockController.close(); mockController = null; verifyShardLeader(reader, "collection1", "shard1", "core4"); @@ -729,11 +802,11 @@ public class OverseerTest extends SolrTestCaseJ4 { reader = new ZkStateReader(controllerClient); reader.createClusterStateWatchersAndUpdate(); - mockController = new MockZKController(server.getZkAddress(), "node1", "collection1"); + mockController = new MockZKController(server.getZkAddress(), "node1"); overseerClient = electNewOverseer(server.getZkAddress()); - mockController.publishState("core1", "core_node1", ZkStateReader.RECOVERING, 1); + mockController.publishState(collection, "core1", "core_node1", ZkStateReader.RECOVERING, 1); waitForCollections(reader, "collection1"); @@ -743,8 +816,8 @@ public class OverseerTest extends SolrTestCaseJ4 { int version = getClusterStateVersion(controllerClient); - mockController = new MockZKController(server.getZkAddress(), "node1", "collection1"); - mockController.publishState("core1", "core_node1", ZkStateReader.RECOVERING, 1); + mockController = new MockZKController(server.getZkAddress(), "node1"); + mockController.publishState(collection, "core1", "core_node1", ZkStateReader.RECOVERING, 1); while (version == getClusterStateVersion(controllerClient)); @@ -794,11 +867,11 @@ public class OverseerTest extends SolrTestCaseJ4 { reader = new ZkStateReader(controllerClient); reader.createClusterStateWatchersAndUpdate(); - mockController = new MockZKController(server.getZkAddress(), "node1", "collection1"); + mockController = new MockZKController(server.getZkAddress(), "node1"); overseerClient = electNewOverseer(server.getZkAddress()); - mockController.publishState("core1", "node1", ZkStateReader.RECOVERING, 12); + mockController.publishState(collection, "core1", "node1", ZkStateReader.RECOVERING, 12); waitForCollections(reader, "collection1");