SOLR-5811: The Overseer will retry work items until success, which is a serious problem if you hit a bad work item.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1574280 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2014-03-05 01:33:35 +00:00
parent 96bcbefdd4
commit 9f701bd304
4 changed files with 178 additions and 69 deletions

View File

@ -116,6 +116,9 @@ Bug Fixes
* SOLR-5761: HttpSolrServer has a few fields that can be set via setters but
are not volatile. (Mark Miller, Gregory Chanan)
* SOLR-5811: The Overseer will retry work items until success, which is a serious
problem if you hit a bad work item. (Mark Miller)
Optimizations
----------------------
* SOLR-1880: Distributed Search skips GET_FIELDS stage if EXECUTE_QUERY

View File

@ -123,7 +123,16 @@ public class Overseer {
else if (LeaderStatus.YES == isLeader) {
final ZkNodeProps message = ZkNodeProps.load(head);
final String operation = message.getStr(QUEUE_OPERATION);
clusterState = processMessage(clusterState, message, operation);
try {
clusterState = processMessage(clusterState, message, operation);
} catch (Exception e) {
// generally there is nothing we can do - in most cases, we have
// an issue that will fail again on retry or we cannot communicate with
// ZooKeeper in which case another Overseer should take over
// TODO: if ordering for the message is not important, we could
// track retries and put it back on the end of the queue
log.error("Could not process Overseer message", e);
}
zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(clusterState), true);
@ -189,8 +198,16 @@ public class Overseer {
while (head != null) {
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
final String operation = message.getStr(QUEUE_OPERATION);
clusterState = processMessage(clusterState, message, operation);
try {
clusterState = processMessage(clusterState, message, operation);
} catch (Exception e) {
// generally there is nothing we can do - in most cases, we have
// an issue that will fail again on retry or we cannot communicate with
// ZooKeeper in which case another Overseer should take over
// TODO: if ordering for the message is not important, we could
// track retries and put it back on the end of the queue
log.error("Could not process Overseer message", e);
}
workQueue.offer(head.getBytes());
stateUpdateQueue.poll();
@ -294,6 +311,7 @@ public class Overseer {
private ClusterState createReplica(ClusterState clusterState, ZkNodeProps message) {
log.info("createReplica() {} ", message);
String coll = message.getStr(ZkStateReader.COLLECTION_PROP);
checkCollection(message, coll);
String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
Slice sl = clusterState.getSlice(coll, slice);
if(sl == null){
@ -334,6 +352,7 @@ public class Overseer {
private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) {
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
checkCollection(message, collection);
log.info("Update shard state invoked for collection: " + collection + " with message: " + message);
for (String key : message.keySet()) {
if (ZkStateReader.COLLECTION_PROP.equals(key)) continue;
@ -358,6 +377,7 @@ public class Overseer {
private ClusterState addRoutingRule(ClusterState clusterState, ZkNodeProps message) {
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
checkCollection(message, collection);
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
String routeKey = message.getStr("routeKey");
String range = message.getStr("range");
@ -397,8 +417,15 @@ public class Overseer {
return clusterState;
}
private void checkCollection(ZkNodeProps message, String collection) {
if (collection == null || collection.trim().length() == 0) {
log.error("Skipping invalid Overseer message because it has no collection specified: " + message);
}
}
private ClusterState removeRoutingRule(ClusterState clusterState, ZkNodeProps message) {
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
checkCollection(message, collection);
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
String routeKeyStr = message.getStr("routeKey");
@ -424,6 +451,7 @@ public class Overseer {
private ClusterState createShard(ClusterState clusterState, ZkNodeProps message) {
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
checkCollection(message, collection);
String shardId = message.getStr(ZkStateReader.SHARD_ID_PROP);
Slice slice = clusterState.getSlice(collection, shardId);
if (slice == null) {
@ -470,6 +498,7 @@ public class Overseer {
private ClusterState updateStateNew(ClusterState clusterState, ZkNodeProps message) {
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
checkCollection(message, collection);
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
if(collection==null || sliceName == null){
@ -490,9 +519,7 @@ public class Overseer {
*/
private ClusterState updateState(ClusterState state, final ZkNodeProps message) {
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
assert collection.length() > 0 : message;
checkCollection(message, collection);
Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null);
log.info("Update state numShards={} message={}", numShards, message);
@ -851,9 +878,7 @@ public class Overseer {
private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) {
final String collection = message.getStr("name");
// final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
// newCollections.remove(collection);
checkCollection(message, collection);
// ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
return clusterState.copyWith(singletonMap(collection, (DocCollection)null));
@ -864,6 +889,7 @@ public class Overseer {
*/
private ClusterState removeShard(final ClusterState clusterState, ZkNodeProps message) {
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
checkCollection(message, collection);
final String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
log.info("Removing collection: " + collection + " shard: " + sliceId + " from clusterstate");
@ -889,6 +915,7 @@ public class Overseer {
String cnn = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
checkCollection(message, collection);
// final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
// DocCollection coll = newCollections.get(collection);

View File

@ -1064,6 +1064,12 @@ public final class ZkController {
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
final String collection = cd.getCloudDescriptor().getCollectionName();
assert collection != null;
if (collection == null || collection.trim().length() == 0) {
log.error("No collection was specified.");
return;
}
ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName));
if (context != null) {

View File

@ -64,18 +64,18 @@ public class OverseerTest extends SolrTestCaseJ4 {
private List<Overseer> overseers = new ArrayList<Overseer>();
private List<ZkStateReader> readers = new ArrayList<ZkStateReader>();
private String collection = "collection1";
public static class MockZKController{
private final SolrZkClient zkClient;
private final ZkStateReader zkStateReader;
private final String nodeName;
private final String collection;
private final LeaderElector elector;
private final Map<String, ElectionContext> electionContext = Collections.synchronizedMap(new HashMap<String, ElectionContext>());
public MockZKController(String zkAddress, String nodeName, String collection) throws InterruptedException, TimeoutException, IOException, KeeperException {
public MockZKController(String zkAddress, String nodeName) throws InterruptedException, TimeoutException, IOException, KeeperException {
this.nodeName = nodeName;
this.collection = collection;
zkClient = new SolrZkClient(zkAddress, TIMEOUT);
zkStateReader = new ZkStateReader(zkClient);
zkStateReader.createClusterStateWatchersAndUpdate();
@ -105,7 +105,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
zkClient.close();
}
public String publishState(String coreName, String coreNodeName, String stateName, int numShards)
public String publishState(String collection, String coreName, String coreNodeName, String stateName, int numShards)
throws KeeperException, InterruptedException, IOException {
if (stateName == null) {
ElectionContext ec = electionContext.remove(coreName);
@ -134,41 +134,40 @@ public class OverseerTest extends SolrTestCaseJ4 {
q.offer(ZkStateReader.toJSON(m));
}
for (int i = 0; i < 120; i++) {
String shardId = getShardId("http://" + nodeName + "/solr/", coreName);
if (shardId != null) {
try {
zkClient.makePath("/collections/" + collection + "/leader_elect/"
+ shardId + "/election", true);
} catch (NodeExistsException nee) {}
ZkNodeProps props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
"http://" + nodeName + "/solr/", ZkStateReader.NODE_NAME_PROP,
nodeName, ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
ShardLeaderElectionContextBase ctx = new ShardLeaderElectionContextBase(
elector, shardId, collection, nodeName + "_" + coreName, props,
zkStateReader);
elector.setup(ctx);
elector.joinElection(ctx, false);
return shardId;
if (collection.length() > 0) {
for (int i = 0; i < 120; i++) {
String shardId = getShardId(collection, coreNodeName);
if (shardId != null) {
try {
zkClient.makePath("/collections/" + collection + "/leader_elect/"
+ shardId + "/election", true);
} catch (NodeExistsException nee) {}
ZkNodeProps props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
"http://" + nodeName + "/solr/", ZkStateReader.NODE_NAME_PROP,
nodeName, ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
ShardLeaderElectionContextBase ctx = new ShardLeaderElectionContextBase(
elector, shardId, collection, nodeName + "_" + coreName, props,
zkStateReader);
elector.setup(ctx);
elector.joinElection(ctx, false);
return shardId;
}
Thread.sleep(500);
}
Thread.sleep(500);
}
return null;
}
private String getShardId(final String baseUrl, final String coreName) {
Map<String,Slice> slices = zkStateReader.getClusterState().getSlicesMap(
collection);
private String getShardId(String collection, String coreNodeName) {
Map<String,Slice> slices = zkStateReader.getClusterState().getSlicesMap(collection);
if (slices != null) {
for (Slice slice : slices.values()) {
for (Replica replica : slice.getReplicas()) {
// TODO: for really large clusters, we could 'index' on this
String rbaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
String rcore = replica.getStr(ZkStateReader.CORE_NAME_PROP);
if (baseUrl.equals(rbaseUrl) && coreName.equals(rcore)) {
String cnn = replica.getName();
if (coreNodeName.equals(cnn)) {
return slice.getName();
}
}
@ -226,17 +225,17 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
zkController = new MockZKController(server.getZkAddress(), "127.0.0.1", "collection1");
zkController = new MockZKController(server.getZkAddress(), "127.0.0.1");
final int numShards=6;
for (int i = 0; i < numShards; i++) {
assertNotNull("shard got no id?", zkController.publishState("core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3));
assertNotNull("shard got no id?", zkController.publishState(collection, "core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3));
}
assertEquals(2, reader.getClusterState().getSlice("collection1", "shard1").getReplicasMap().size());
assertEquals(2, reader.getClusterState().getSlice("collection1", "shard2").getReplicasMap().size());
assertEquals(2, reader.getClusterState().getSlice("collection1", "shard3").getReplicasMap().size());
Map<String,Replica> rmap = reader.getClusterState().getSlice("collection1", "shard1").getReplicasMap();
assertEquals(rmap.toString(), 2, rmap.size());
assertEquals(rmap.toString(), 2, reader.getClusterState().getSlice("collection1", "shard2").getReplicasMap().size());
assertEquals(rmap.toString(), 2, reader.getClusterState().getSlice("collection1", "shard3").getReplicasMap().size());
//make sure leaders are in cloud state
assertNotNull(reader.getLeaderUrl("collection1", "shard1", 15000));
@ -258,6 +257,81 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
}
@Test
public void testBadQueueItem() throws Exception {
String zkDir = dataDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
ZkTestServer server = new ZkTestServer(zkDir);
MockZKController zkController = null;
SolrZkClient zkClient = null;
SolrZkClient overseerClient = null;
try {
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
overseerClient = electNewOverseer(server.getZkAddress());
ZkStateReader reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
zkController = new MockZKController(server.getZkAddress(), "127.0.0.1");
final int numShards=3;
for (int i = 0; i < numShards; i++) {
assertNotNull("shard got no id?", zkController.publishState(collection, "core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3));
}
assertEquals(1, reader.getClusterState().getSlice(collection, "shard1").getReplicasMap().size());
assertEquals(1, reader.getClusterState().getSlice(collection, "shard2").getReplicasMap().size());
assertEquals(1, reader.getClusterState().getSlice(collection, "shard3").getReplicasMap().size());
//make sure leaders are in cloud state
assertNotNull(reader.getLeaderUrl(collection, "shard1", 15000));
assertNotNull(reader.getLeaderUrl(collection, "shard2", 15000));
assertNotNull(reader.getLeaderUrl(collection, "shard3", 15000));
// publish a bad queue item
String emptyCollectionName = "";
zkController.publishState(emptyCollectionName, "core0", "node0", ZkStateReader.ACTIVE, 1);
zkController.publishState(emptyCollectionName, "core0", "node0", null, 1);
// make sure the Overseer is still processing items
for (int i = 0; i < numShards; i++) {
assertNotNull("shard got no id?", zkController.publishState("collection2", "core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3));
}
assertEquals(1, reader.getClusterState().getSlice("collection2", "shard1").getReplicasMap().size());
assertEquals(1, reader.getClusterState().getSlice("collection2", "shard2").getReplicasMap().size());
assertEquals(1, reader.getClusterState().getSlice("collection2", "shard3").getReplicasMap().size());
//make sure leaders are in cloud state
assertNotNull(reader.getLeaderUrl("collection2", "shard1", 15000));
assertNotNull(reader.getLeaderUrl("collection2", "shard2", 15000));
assertNotNull(reader.getLeaderUrl("collection2", "shard3", 15000));
} finally {
if (DEBUG) {
if (zkController != null) {
zkClient.printLayoutToStdOut();
}
}
close(zkClient);
if (zkController != null) {
zkController.close();
}
close(overseerClient);
server.shutdown();
}
}
@Test
public void testShardAssignmentBigger() throws Exception {
String zkDir = dataDir.getAbsolutePath() + File.separator
@ -289,7 +363,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
reader.createClusterStateWatchersAndUpdate();
for (int i = 0; i < nodeCount; i++) {
controllers[i] = new MockZKController(server.getZkAddress(), "node" + i, "collection1");
controllers[i] = new MockZKController(server.getZkAddress(), "node" + i);
}
for (int i = 0; i < nodeCount; i++) {
nodeExecutors[i] = Executors.newFixedThreadPool(1, new DefaultSolrThreadFactory("testShardAssignment"));
@ -306,7 +380,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
final String coreName = "core" + slot;
try {
ids[slot]=controllers[slot % nodeCount].publishState(coreName, "node" + slot, ZkStateReader.ACTIVE, sliceCount);
ids[slot]=controllers[slot % nodeCount].publishState(collection, coreName, "node" + slot, ZkStateReader.ACTIVE, sliceCount);
} catch (Throwable e) {
e.printStackTrace();
fail("register threw exception:" + e.getClass());
@ -551,21 +625,20 @@ public class OverseerTest extends SolrTestCaseJ4 {
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
mockController = new MockZKController(server.getZkAddress(), "node1",
"collection1");
mockController = new MockZKController(server.getZkAddress(), "node1");
overseerClient = electNewOverseer(server.getZkAddress());
Thread.sleep(1000);
mockController.publishState("core1", "core_node1",
mockController.publishState(collection, "core1", "core_node1",
ZkStateReader.RECOVERING, 1);
waitForCollections(reader, "collection1");
waitForCollections(reader, collection);
verifyStatus(reader, ZkStateReader.RECOVERING);
int version = getClusterStateVersion(zkClient);
mockController.publishState("core1", "core_node1", ZkStateReader.ACTIVE,
mockController.publishState(collection, "core1", "core_node1", ZkStateReader.ACTIVE,
1);
while (version == getClusterStateVersion(zkClient));
@ -575,7 +648,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
overseerClient.close();
Thread.sleep(1000); // wait for overseer to get killed
mockController.publishState("core1", "core_node1",
mockController.publishState(collection, "core1", "core_node1",
ZkStateReader.RECOVERING, 1);
version = getClusterStateVersion(zkClient);
@ -588,13 +661,13 @@ public class OverseerTest extends SolrTestCaseJ4 {
assertEquals("Live nodes count does not match", 1, reader
.getClusterState().getLiveNodes().size());
assertEquals("Shard count does not match", 1, reader.getClusterState()
.getSlice("collection1", "shard1").getReplicasMap().size());
.getSlice(collection, "shard1").getReplicasMap().size());
version = getClusterStateVersion(zkClient);
mockController.publishState("core1", "core_node1", null, 1);
mockController.publishState(collection, "core1", "core_node1", null, 1);
while (version == getClusterStateVersion(zkClient));
Thread.sleep(500);
assertFalse("collection1 should be gone after publishing the null state",
reader.getClusterState().getCollections().contains("collection1"));
reader.getClusterState().getCollections().contains(collection));
} finally {
close(mockController);
close(overseerClient);
@ -676,17 +749,17 @@ public class OverseerTest extends SolrTestCaseJ4 {
for (int i = 0; i < atLeast(4); i++) {
killCounter.incrementAndGet(); //for each round allow 1 kill
mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
mockController.publishState("core1", "node1", "state1",1);
mockController = new MockZKController(server.getZkAddress(), "node1");
mockController.publishState(collection, "core1", "node1", "state1",1);
if(mockController2!=null) {
mockController2.close();
mockController2 = null;
}
mockController.publishState("core1", "node1","state2",1);
mockController2 = new MockZKController(server.getZkAddress(), "node2", "collection1");
mockController.publishState("core1", "node1", "state1",1);
mockController.publishState(collection, "core1", "node1","state2",1);
mockController2 = new MockZKController(server.getZkAddress(), "node2");
mockController.publishState(collection, "core1", "node1", "state1",1);
verifyShardLeader(reader, "collection1", "shard1", "core1");
mockController2.publishState("core4", "node2", "state2" ,1);
mockController2.publishState(collection, "core4", "node2", "state2" ,1);
mockController.close();
mockController = null;
verifyShardLeader(reader, "collection1", "shard1", "core4");
@ -729,11 +802,11 @@ public class OverseerTest extends SolrTestCaseJ4 {
reader = new ZkStateReader(controllerClient);
reader.createClusterStateWatchersAndUpdate();
mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
mockController = new MockZKController(server.getZkAddress(), "node1");
overseerClient = electNewOverseer(server.getZkAddress());
mockController.publishState("core1", "core_node1", ZkStateReader.RECOVERING, 1);
mockController.publishState(collection, "core1", "core_node1", ZkStateReader.RECOVERING, 1);
waitForCollections(reader, "collection1");
@ -743,8 +816,8 @@ public class OverseerTest extends SolrTestCaseJ4 {
int version = getClusterStateVersion(controllerClient);
mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
mockController.publishState("core1", "core_node1", ZkStateReader.RECOVERING, 1);
mockController = new MockZKController(server.getZkAddress(), "node1");
mockController.publishState(collection, "core1", "core_node1", ZkStateReader.RECOVERING, 1);
while (version == getClusterStateVersion(controllerClient));
@ -794,11 +867,11 @@ public class OverseerTest extends SolrTestCaseJ4 {
reader = new ZkStateReader(controllerClient);
reader.createClusterStateWatchersAndUpdate();
mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
mockController = new MockZKController(server.getZkAddress(), "node1");
overseerClient = electNewOverseer(server.getZkAddress());
mockController.publishState("core1", "node1", ZkStateReader.RECOVERING, 12);
mockController.publishState(collection, "core1", "node1", ZkStateReader.RECOVERING, 12);
waitForCollections(reader, "collection1");