SOLR-11445: Overseer should not hang when process bad message

This commit is contained in:
Cao Manh Dat 2017-10-12 15:08:24 +07:00
parent b21721f152
commit df5fefb0db
2 changed files with 88 additions and 25 deletions

View File

@ -159,7 +159,15 @@ public class Overseer implements Closeable {
log.debug("processMessage: workQueueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
// force flush to ZK after each message because there is no fallback if workQueue items
// are removed from workQueue but fail to be written to ZK
clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
try {
clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
} catch (Exception e) {
if (isBadMessage(e)) {
log.warn("Exception when process message = {}, consider as bad message and poll out from the queue", message);
workQueue.poll();
}
throw e;
}
workQueue.poll(); // poll-ing removes the element we got by peek-ing
data = workQueue.peek();
}
@ -167,33 +175,28 @@ public class Overseer implements Closeable {
if (hadWorkItems) {
clusterState = zkStateWriter.writePendingUpdates();
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
return;
}
log.error("Exception in Overseer work queue loop", e);
} catch (KeeperException.SessionExpiredException e) {
log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
return;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
log.error("Exception in Overseer work queue loop", e);
log.error("Exception in Overseer when process message from work queue, retrying", e);
refreshClusterState = true;
continue;
}
}
byte[] head = null;
try {
head = stateUpdateQueue.peek(true);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
return;
}
log.error("Exception in Overseer main queue loop", e);
} catch (KeeperException.SessionExpiredException e) {
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
return;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
log.error("Exception in Overseer main queue loop", e);
}
@ -237,16 +240,9 @@ public class Overseer implements Closeable {
// clean work queue
while (workQueue.poll() != null);
} catch (KeeperException.BadVersionException bve) {
log.warn("Bad version writing to ZK using compare-and-set, will force refresh cluster state: {}", bve.getMessage());
refreshClusterState = true;
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
return;
}
log.error("Exception in Overseer main queue loop", e);
refreshClusterState = true; // force refresh state in case of all errors
} catch (KeeperException.SessionExpiredException e) {
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
return;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
@ -262,6 +258,16 @@ public class Overseer implements Closeable {
}
}
// Return true whenever the exception thrown by ZkStateWriter is correspond
// to a invalid state or 'bad' message (in this case, we should remove that message from queue)
private boolean isBadMessage(Exception e) {
if (e instanceof KeeperException) {
KeeperException ke = (KeeperException) e;
return ke.code() == KeeperException.Code.NONODE || ke.code() == KeeperException.Code.NODEEXISTS;
}
return !(e instanceof InterruptedException);
}
private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception {
final String operation = message.getStr(QUEUE_OPERATION);
List<ZkWriteCommand> zkWriteCommands = null;

View File

@ -690,6 +690,63 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
}
@Test
public void testExceptionWhenFlushClusterState() throws Exception {
String zkDir = createTempDir("zkData").toFile().getAbsolutePath();
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient controllerClient = null;
SolrZkClient overseerClient = null;
ZkStateReader reader = null;
try {
server.run();
controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
ZkController.createClusterZkNodes(controllerClient);
reader = new ZkStateReader(controllerClient);
reader.createClusterStateWatchersAndUpdate();
// We did not create /collections -> this message will cause exception when Overseer try to flush the clusterstate
ZkNodeProps badMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
"name", "collection1",
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.NUM_SHARDS_PROP, "1",
DocCollection.STATE_FORMAT, "2",
"createNodeSet", "");
ZkNodeProps goodMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
"name", "collection2",
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.NUM_SHARDS_PROP, "1",
DocCollection.STATE_FORMAT, "1",
"createNodeSet", "");
ZkDistributedQueue workQueue = Overseer.getInternalWorkQueue(controllerClient, new Overseer.Stats());
workQueue.offer(Utils.toJSON(badMessage));
workQueue.offer(Utils.toJSON(goodMessage));
overseerClient = electNewOverseer(server.getZkAddress());
waitForCollections(reader, "collection2");
ZkDistributedQueue q = Overseer.getStateUpdateQueue(controllerClient);
q.offer(Utils.toJSON(badMessage));
q.offer(Utils.toJSON(goodMessage.plus("name", "collection3")));
waitForCollections(reader, "collection2", "collection3");
assertNotNull(reader.getClusterState().getCollectionOrNull("collection2"));
assertNotNull(reader.getClusterState().getCollectionOrNull("collection3"));
assertTrue(workQueue.peek() == null);
assertTrue(q.peek() == null);
} finally {
close(overseerClient);
close(controllerClient);
close(reader);
server.shutdown();
}
}
@Test
public void testShardLeaderChange() throws Exception {
String zkDir = createTempDir("zkData").toFile().getAbsolutePath();