SOLR-7869: Overseer does not handle BadVersionException correctly and, in some cases, can go into an infinite loop if cluster state in ZooKeeper is modified externally

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1695746 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2015-08-13 18:31:09 +00:00
parent 9dc862147e
commit 7ebc890bec
5 changed files with 352 additions and 16 deletions

View File

@ -133,6 +133,10 @@ Bug Fixes
* SOLR-7836: Possible deadlock when closing refcounted index writers.
(Jessica Cheng Mallet, Erick Erickson)
* SOLR-7869: Overseer does not handle BadVersionException correctly and, in some cases,
can go into an infinite loop if cluster state in ZooKeeper is modified externally.
(Scott Blum, shalin)
Optimizations
----------------------

View File

@ -136,7 +136,7 @@ public class Overseer implements Closeable {
log.info("Starting to work on the main queue");
try {
ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
ZkStateWriter zkStateWriter = null;
ClusterState clusterState = null;
boolean refreshClusterState = true; // let's refresh in the first iteration
while (!this.isClosed) {
@ -153,6 +153,7 @@ public class Overseer implements Closeable {
try {
reader.updateClusterState();
clusterState = reader.getClusterState();
zkStateWriter = new ZkStateWriter(reader, stats);
refreshClusterState = false;
// if there were any errors while processing
@ -237,13 +238,16 @@ public class Overseer implements Closeable {
// clean work queue
while (workQueue.poll() != null);
} catch (KeeperException.BadVersionException bve) {
log.warn("Bad version writing to ZK using compare-and-set, will force refresh cluster state", bve);
refreshClusterState = true;
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
return;
}
log.error("Exception in Overseer main queue loop", e);
refreshClusterState = true; // it might have been a bad version error
refreshClusterState = true; // force refresh state in case of all errors
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;

View File

@ -36,8 +36,28 @@ import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonMap;
/**
* ZkStateWriter is responsible for writing updates to the cluster state stored in ZooKeeper for
* both stateFormat=1 collection (stored in shared /clusterstate.json in ZK) and stateFormat=2 collections
* each of which get their own individual state.json in ZK.
*
* Updates to the cluster state are specified using the
* {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} method. The class buffers updates
* to reduce the number of writes to ZK. The buffered updates are flushed during <code>enqueueUpdate</code>
* automatically if necessary. The {@link #writePendingUpdates()} can be used to force flush any pending updates.
*
* If either {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} or {@link #writePendingUpdates()}
* throws a {@link org.apache.zookeeper.KeeperException.BadVersionException} then the internal buffered state of the
* class is suspect and the current instance of the class should be discarded and a new instance should be created
* and used for any future updates.
*/
public class ZkStateWriter {
private static final long MAX_FLUSH_INTERVAL = TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
private static Logger log = LoggerFactory.getLogger(ZkStateWriter.class);
/**
* Represents a no-op {@link ZkWriteCommand} which will result in no modification to cluster state
*/
public static ZkWriteCommand NO_OP = ZkWriteCommand.noop();
protected final ZkStateReader reader;
@ -52,6 +72,12 @@ public class ZkStateWriter {
protected int lastStateFormat = -1; // sentinel value
protected String lastCollectionName = null;
/**
* Set to true if we ever get a BadVersionException so that we can disallow future operations
* with this instance
*/
protected boolean invalidState = false;
public ZkStateWriter(ZkStateReader zkStateReader, Overseer.Stats stats) {
assert zkStateReader != null;
@ -59,7 +85,32 @@ public class ZkStateWriter {
this.stats = stats;
}
public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws Exception {
/**
* Applies the given {@link ZkWriteCommand} on the <code>prevState</code>. The modified
* {@link ClusterState} is returned and it is expected that the caller will use the returned
* cluster state for the subsequent invocation of this method.
* <p>
* The modified state may be buffered or flushed to ZooKeeper depending on the internal buffering
* logic of this class. The {@link #hasPendingUpdates()} method may be used to determine if the
* last enqueue operation resulted in buffered state. The method {@link #writePendingUpdates()} can
* be used to force an immediate flush of pending cluster state changes.
*
* @param prevState the cluster state information on which the given <code>cmd</code> is applied
* @param cmd the {@link ZkWriteCommand} which specifies the change to be applied to cluster state
* @param callback a {@link org.apache.solr.cloud.overseer.ZkStateWriter.ZkWriteCallback} object to be used
* for any callbacks
* @return modified cluster state created after applying <code>cmd</code> to <code>prevState</code>. If
* <code>cmd</code> is a no-op ({@link #NO_OP}) then the <code>prevState</code> is returned unmodified.
* @throws IllegalStateException if the current instance is no longer usable. The current instance must be
* discarded.
* @throws Exception on an error in ZK operations or callback. If a flush to ZooKeeper results
* in a {@link org.apache.zookeeper.KeeperException.BadVersionException} this instance becomes unusable and
* must be discarded
*/
public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws IllegalStateException, Exception {
if (invalidState) {
throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
}
if (cmd == NO_OP) return prevState;
if (maybeFlushBefore(cmd)) {
@ -129,14 +180,25 @@ public class ZkStateWriter {
return false;
lastCollectionName = cmd.name;
lastStateFormat = cmd.collection.getStateFormat();
return System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
return System.nanoTime() - lastUpdatedTime > MAX_FLUSH_INTERVAL;
}
public boolean hasPendingUpdates() {
return !updates.isEmpty() || isClusterStateModified;
}
public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
/**
* Writes all pending updates to ZooKeeper and returns the modified cluster state
*
* @return the modified cluster state
* @throws IllegalStateException if the current instance is no longer usable and must be discarded
* @throws KeeperException if any ZooKeeper operation results in an error
* @throws InterruptedException if the current thread is interrupted
*/
public ClusterState writePendingUpdates() throws IllegalStateException, KeeperException, InterruptedException {
if (invalidState) {
throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
}
if (!hasPendingUpdates()) return clusterState;
TimerContext timerContext = stats.time("update_state");
boolean success = false;
@ -188,6 +250,10 @@ public class ZkStateWriter {
isClusterStateModified = false;
}
success = true;
} catch (KeeperException.BadVersionException bve) {
// this is a tragic error, we must disallow usage of this instance
invalidState = true;
throw bve;
} finally {
timerContext.stop();
if (success) {

View File

@ -17,6 +17,7 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@ -31,8 +32,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.overseer.OverseerAction;
@ -58,6 +57,7 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -573,7 +573,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
q.offer(Utils.toJSON(m));
verifyStatus(reader, Replica.State.ACTIVE);
verifyStatus(reader, "collection1", "shard1", "core_node1", Replica.State.ACTIVE);
} finally {
@ -585,20 +585,20 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
}
private void verifyStatus(ZkStateReader reader, Replica.State expectedState) throws InterruptedException {
private void verifyStatus(ZkStateReader reader, String collection, String shard, String coreNodeName, Replica.State expectedState) throws InterruptedException {
int maxIterations = 100;
Replica.State coreState = null;
while(maxIterations-->0) {
Slice slice = reader.getClusterState().getSlice("collection1", "shard1");
Slice slice = reader.getClusterState().getSlice(collection, shard);
if(slice!=null) {
coreState = slice.getReplicasMap().get("core_node1").getState();
coreState = slice.getReplicasMap().get(coreNodeName).getState();
if(coreState == expectedState) {
return;
}
}
Thread.sleep(50);
}
fail("Illegal state, was:" + coreState + " expected:" + expectedState + "clusterState:" + reader.getClusterState());
fail("Illegal state, was: " + coreState + " expected:" + expectedState + " clusterState:" + reader.getClusterState());
}
private void verifyShardLeader(ZkStateReader reader, String collection, String shard, String expectedCore) throws InterruptedException, KeeperException {
@ -649,7 +649,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
mockController.publishState(collection, "core1", "core_node1", Replica.State.RECOVERING, 1);
waitForCollections(reader, collection);
verifyStatus(reader, Replica.State.RECOVERING);
verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
int version = getClusterStateVersion(zkClient);
@ -657,7 +657,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
while (version == getClusterStateVersion(zkClient));
verifyStatus(reader, Replica.State.ACTIVE);
verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.ACTIVE);
version = getClusterStateVersion(zkClient);
overseerClient.close();
Thread.sleep(1000); // wait for overseer to get killed
@ -669,7 +669,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
while (version == getClusterStateVersion(zkClient));
verifyStatus(reader, Replica.State.RECOVERING);
verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
assertEquals("Live nodes count does not match", 1, reader
.getClusterState().getLiveNodes().size());
@ -884,7 +884,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
waitForCollections(reader, "collection1");
verifyStatus(reader, Replica.State.RECOVERING);
verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
mockController.close();
@ -1163,6 +1163,111 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
}
@Test
public void testExternalClusterStateChangeBehavior() throws Exception {
String zkDir = createTempDir("testExternalClusterStateChangeBehavior").toFile().getAbsolutePath();
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient zkClient = null;
ZkStateReader reader = null;
SolrZkClient overseerClient = null;
try {
server.run();
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
ZkController.createClusterZkNodes(zkClient);
zkClient.create("/collections/test", null, CreateMode.PERSISTENT, true);
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
overseerClient = electNewOverseer(server.getZkAddress());
DistributedQueue q = Overseer.getInQueue(zkClient);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
ZkStateReader.NODE_NAME_PROP, "node1",
ZkStateReader.COLLECTION_PROP, "c1",
ZkStateReader.CORE_NAME_PROP, "core1",
ZkStateReader.ROLES_PROP, "",
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
q.offer(Utils.toJSON(m));
waitForCollections(reader, "c1");
verifyStatus(reader, "c1", "shard1", "core_node1", Replica.State.DOWN);
m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
ZkStateReader.NODE_NAME_PROP, "node1",
ZkStateReader.COLLECTION_PROP, "c1",
ZkStateReader.CORE_NAME_PROP, "core1",
ZkStateReader.ROLES_PROP, "",
ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
q.offer(Utils.toJSON(m));
m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
ZkStateReader.NODE_NAME_PROP, "node1",
ZkStateReader.COLLECTION_PROP, "c1",
ZkStateReader.CORE_NAME_PROP, "core1",
ZkStateReader.ROLES_PROP, "",
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
q.offer(Utils.toJSON(m));
Stat stat = new Stat();
byte[] data = zkClient.getData("/clusterstate.json", null, stat, true);
// Simulate an external modification
zkClient.setData("/clusterstate.json", data, true);
m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
"name", "test",
ZkStateReader.NUM_SHARDS_PROP, "1",
ZkStateReader.REPLICATION_FACTOR, "1",
DocCollection.STATE_FORMAT, "2"
);
q.offer(Utils.toJSON(m));
m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATESHARD.toLower(),
"collection", "test",
ZkStateReader.SHARD_ID_PROP, "x",
ZkStateReader.REPLICATION_FACTOR, "1"
);
q.offer(Utils.toJSON(m));
m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.ADDREPLICA.toLower(),
"collection", "test",
ZkStateReader.SHARD_ID_PROP, "x",
ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
ZkStateReader.NODE_NAME_PROP, "node1",
ZkStateReader.CORE_NAME_PROP, "core1",
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString()
);
q.offer(Utils.toJSON(m));
waitForCollections(reader, "test");
verifyStatus(reader, "test", "x", "core_node1", Replica.State.DOWN);
waitForCollections(reader, "c1");
verifyStatus(reader, "c1", "shard1", "core_node1", Replica.State.ACTIVE);
} finally {
close(zkClient);
close(overseerClient);
close(reader);
server.shutdown();
}
}
private void close(ZkStateReader reader) {
if (reader != null) {
reader.close();

View File

@ -31,6 +31,7 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
import java.util.HashMap;
import java.util.Map;
@ -204,4 +205,160 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
}
public void testExternalModificationToSharedClusterState() throws Exception {
String zkDir = createTempDir("testExternalModification").toFile().getAbsolutePath();
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient zkClient = null;
try {
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient);
ZkStateReader reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
// create collection 1 with stateFormat = 1
ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
writer.enqueueUpdate(reader.getClusterState(), c1, null);
writer.writePendingUpdates();
reader.updateClusterState();
ClusterState clusterState = reader.getClusterState(); // keep a reference to the current cluster state object
assertTrue(clusterState.hasCollection("c1"));
assertFalse(clusterState.hasCollection("c2"));
// Simulate an external modification to /clusterstate.json
byte[] data = zkClient.getData("/clusterstate.json", null, null, true);
zkClient.setData("/clusterstate.json", data, true);
// enqueue another c1 so that ZkStateWriter has pending updates
writer.enqueueUpdate(clusterState, c1, null);
assertTrue(writer.hasPendingUpdates());
// create collection 2 with stateFormat = 1
ZkWriteCommand c2 = new ZkWriteCommand("c2",
new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
try {
writer.enqueueUpdate(clusterState, c2, null); // we are sending in the old cluster state object
fail("Enqueue should not have succeeded");
} catch (KeeperException.BadVersionException bve) {
// expected
}
reader.updateClusterState();
try {
writer.enqueueUpdate(reader.getClusterState(), c2, null);
fail("enqueueUpdate after BadVersionException should not have suceeded");
} catch (IllegalStateException e) {
// expected
}
try {
writer.writePendingUpdates();
fail("writePendingUpdates after BadVersionException should not have suceeded");
} catch (IllegalStateException e) {
// expected
}
} finally {
IOUtils.close(zkClient);
server.shutdown();
}
}
public void testExternalModificationToStateFormat2() throws Exception {
String zkDir = createTempDir("testExternalModificationToStateFormat2").toFile().getAbsolutePath();
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient zkClient = null;
try {
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient);
ZkStateReader reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
ClusterState state = reader.getClusterState();
// create collection 2 with stateFormat = 2
ZkWriteCommand c2 = new ZkWriteCommand("c2",
new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
state = writer.enqueueUpdate(reader.getClusterState(), c2, null);
assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately
int sharedClusterStateVersion = state.getZkClusterStateVersion();
int stateFormat2Version = state.getCollection("c2").getZNodeVersion();
// Simulate an external modification to /collections/c2/state.json
byte[] data = zkClient.getData(ZkStateReader.getCollectionPath("c2"), null, null, true);
zkClient.setData(ZkStateReader.getCollectionPath("c2"), data, true);
// get the most up-to-date state
reader.updateClusterState();
state = reader.getClusterState();
assertTrue(state.hasCollection("c2"));
assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion());
assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion());
// enqueue an update to stateFormat2 collection such that update is pending
state = writer.enqueueUpdate(state, c2, null);
assertTrue(writer.hasPendingUpdates());
// get the most up-to-date state
reader.updateClusterState();
state = reader.getClusterState();
// enqueue a stateFormat=1 collection which should cause a flush
ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
try {
state = writer.enqueueUpdate(state, c1, null);
fail("Enqueue should not have succeeded");
} catch (KeeperException.BadVersionException bve) {
// expected
}
try {
writer.enqueueUpdate(reader.getClusterState(), c2, null);
fail("enqueueUpdate after BadVersionException should not have suceeded");
} catch (IllegalStateException e) {
// expected
}
try {
writer.writePendingUpdates();
fail("writePendingUpdates after BadVersionException should not have suceeded");
} catch (IllegalStateException e) {
// expected
}
} finally {
IOUtils.close(zkClient);
server.shutdown();
}
}
}