From 7ebc890bec9491fd1613c0701d2c3b7f023f6812 Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Thu, 13 Aug 2015 18:31:09 +0000 Subject: [PATCH] SOLR-7869: Overseer does not handle BadVersionException correctly and, in some cases, can go into an infinite loop if cluster state in ZooKeeper is modified externally git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1695746 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 4 + .../java/org/apache/solr/cloud/Overseer.java | 8 +- .../solr/cloud/overseer/ZkStateWriter.java | 72 +++++++- .../org/apache/solr/cloud/OverseerTest.java | 127 ++++++++++++-- .../cloud/overseer/ZkStateWriterTest.java | 157 ++++++++++++++++++ 5 files changed, 352 insertions(+), 16 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 23ab51c7e71..e433a1e25b6 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -133,6 +133,10 @@ Bug Fixes * SOLR-7836: Possible deadlock when closing refcounted index writers. (Jessica Cheng Mallet, Erick Erickson) +* SOLR-7869: Overseer does not handle BadVersionException correctly and, in some cases, + can go into an infinite loop if cluster state in ZooKeeper is modified externally. + (Scott Blum, shalin) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 6950d559bd4..abb24790f57 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -136,7 +136,7 @@ public class Overseer implements Closeable { log.info("Starting to work on the main queue"); try { - ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats); + ZkStateWriter zkStateWriter = null; ClusterState clusterState = null; boolean refreshClusterState = true; // let's refresh in the first iteration while (!this.isClosed) { @@ -153,6 +153,7 @@ public class Overseer implements Closeable { try { reader.updateClusterState(); clusterState = reader.getClusterState(); + zkStateWriter = new ZkStateWriter(reader, stats); refreshClusterState = false; // if there were any errors while processing @@ -237,13 +238,16 @@ public class Overseer implements Closeable { // clean work queue while (workQueue.poll() != null); + } catch (KeeperException.BadVersionException bve) { + log.warn("Bad version writing to ZK using compare-and-set, will force refresh cluster state", bve); + refreshClusterState = true; } catch (KeeperException e) { if (e.code() == KeeperException.Code.SESSIONEXPIRED) { log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e); return; } log.error("Exception in Overseer main queue loop", e); - refreshClusterState = true; // it might have been a bad version error + refreshClusterState = true; // force refresh state in case of all errors } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java index 768c9584763..1f813d5ac70 100644 --- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java @@ -36,8 +36,28 @@ import org.slf4j.LoggerFactory; import static java.util.Collections.singletonMap; +/** + * ZkStateWriter is responsible for writing updates to the cluster state stored in ZooKeeper for + * both stateFormat=1 collection (stored in shared /clusterstate.json in ZK) and stateFormat=2 collections + * each of which get their own individual state.json in ZK. + * + * Updates to the cluster state are specified using the + * {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} method. The class buffers updates + * to reduce the number of writes to ZK. The buffered updates are flushed during enqueueUpdate + * automatically if necessary. The {@link #writePendingUpdates()} can be used to force flush any pending updates. + * + * If either {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} or {@link #writePendingUpdates()} + * throws a {@link org.apache.zookeeper.KeeperException.BadVersionException} then the internal buffered state of the + * class is suspect and the current instance of the class should be discarded and a new instance should be created + * and used for any future updates. + */ public class ZkStateWriter { + private static final long MAX_FLUSH_INTERVAL = TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS); private static Logger log = LoggerFactory.getLogger(ZkStateWriter.class); + + /** + * Represents a no-op {@link ZkWriteCommand} which will result in no modification to cluster state + */ public static ZkWriteCommand NO_OP = ZkWriteCommand.noop(); protected final ZkStateReader reader; @@ -52,6 +72,12 @@ public class ZkStateWriter { protected int lastStateFormat = -1; // sentinel value protected String lastCollectionName = null; + /** + * Set to true if we ever get a BadVersionException so that we can disallow future operations + * with this instance + */ + protected boolean invalidState = false; + public ZkStateWriter(ZkStateReader zkStateReader, Overseer.Stats stats) { assert zkStateReader != null; @@ -59,7 +85,32 @@ public class ZkStateWriter { this.stats = stats; } - public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws Exception { + /** + * Applies the given {@link ZkWriteCommand} on the prevState. The modified + * {@link ClusterState} is returned and it is expected that the caller will use the returned + * cluster state for the subsequent invocation of this method. + *

+ * The modified state may be buffered or flushed to ZooKeeper depending on the internal buffering + * logic of this class. The {@link #hasPendingUpdates()} method may be used to determine if the + * last enqueue operation resulted in buffered state. The method {@link #writePendingUpdates()} can + * be used to force an immediate flush of pending cluster state changes. + * + * @param prevState the cluster state information on which the given cmd is applied + * @param cmd the {@link ZkWriteCommand} which specifies the change to be applied to cluster state + * @param callback a {@link org.apache.solr.cloud.overseer.ZkStateWriter.ZkWriteCallback} object to be used + * for any callbacks + * @return modified cluster state created after applying cmd to prevState. If + * cmd is a no-op ({@link #NO_OP}) then the prevState is returned unmodified. + * @throws IllegalStateException if the current instance is no longer usable. The current instance must be + * discarded. + * @throws Exception on an error in ZK operations or callback. If a flush to ZooKeeper results + * in a {@link org.apache.zookeeper.KeeperException.BadVersionException} this instance becomes unusable and + * must be discarded + */ + public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws IllegalStateException, Exception { + if (invalidState) { + throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used"); + } if (cmd == NO_OP) return prevState; if (maybeFlushBefore(cmd)) { @@ -129,14 +180,25 @@ public class ZkStateWriter { return false; lastCollectionName = cmd.name; lastStateFormat = cmd.collection.getStateFormat(); - return System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS); + return System.nanoTime() - lastUpdatedTime > MAX_FLUSH_INTERVAL; } public boolean hasPendingUpdates() { return !updates.isEmpty() || isClusterStateModified; } - public ClusterState writePendingUpdates() throws KeeperException, InterruptedException { + /** + * Writes all pending updates to ZooKeeper and returns the modified cluster state + * + * @return the modified cluster state + * @throws IllegalStateException if the current instance is no longer usable and must be discarded + * @throws KeeperException if any ZooKeeper operation results in an error + * @throws InterruptedException if the current thread is interrupted + */ + public ClusterState writePendingUpdates() throws IllegalStateException, KeeperException, InterruptedException { + if (invalidState) { + throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used"); + } if (!hasPendingUpdates()) return clusterState; TimerContext timerContext = stats.time("update_state"); boolean success = false; @@ -188,6 +250,10 @@ public class ZkStateWriter { isClusterStateModified = false; } success = true; + } catch (KeeperException.BadVersionException bve) { + // this is a tragic error, we must disallow usage of this instance + invalidState = true; + throw bve; } finally { timerContext.stop(); if (success) { diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java index 63cc7af9537..41f76f2cb7a 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java @@ -17,6 +17,7 @@ package org.apache.solr.cloud; * limitations under the License. */ +import javax.xml.parsers.ParserConfigurationException; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -31,8 +32,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import javax.xml.parsers.ParserConfigurationException; - import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.cloud.overseer.OverseerAction; @@ -58,6 +57,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -573,7 +573,7 @@ public class OverseerTest extends SolrTestCaseJ4 { q.offer(Utils.toJSON(m)); - verifyStatus(reader, Replica.State.ACTIVE); + verifyStatus(reader, "collection1", "shard1", "core_node1", Replica.State.ACTIVE); } finally { @@ -585,20 +585,20 @@ public class OverseerTest extends SolrTestCaseJ4 { } } - private void verifyStatus(ZkStateReader reader, Replica.State expectedState) throws InterruptedException { + private void verifyStatus(ZkStateReader reader, String collection, String shard, String coreNodeName, Replica.State expectedState) throws InterruptedException { int maxIterations = 100; Replica.State coreState = null; while(maxIterations-->0) { - Slice slice = reader.getClusterState().getSlice("collection1", "shard1"); + Slice slice = reader.getClusterState().getSlice(collection, shard); if(slice!=null) { - coreState = slice.getReplicasMap().get("core_node1").getState(); + coreState = slice.getReplicasMap().get(coreNodeName).getState(); if(coreState == expectedState) { return; } } Thread.sleep(50); } - fail("Illegal state, was:" + coreState + " expected:" + expectedState + "clusterState:" + reader.getClusterState()); + fail("Illegal state, was: " + coreState + " expected:" + expectedState + " clusterState:" + reader.getClusterState()); } private void verifyShardLeader(ZkStateReader reader, String collection, String shard, String expectedCore) throws InterruptedException, KeeperException { @@ -649,7 +649,7 @@ public class OverseerTest extends SolrTestCaseJ4 { mockController.publishState(collection, "core1", "core_node1", Replica.State.RECOVERING, 1); waitForCollections(reader, collection); - verifyStatus(reader, Replica.State.RECOVERING); + verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING); int version = getClusterStateVersion(zkClient); @@ -657,7 +657,7 @@ public class OverseerTest extends SolrTestCaseJ4 { while (version == getClusterStateVersion(zkClient)); - verifyStatus(reader, Replica.State.ACTIVE); + verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.ACTIVE); version = getClusterStateVersion(zkClient); overseerClient.close(); Thread.sleep(1000); // wait for overseer to get killed @@ -669,7 +669,7 @@ public class OverseerTest extends SolrTestCaseJ4 { while (version == getClusterStateVersion(zkClient)); - verifyStatus(reader, Replica.State.RECOVERING); + verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING); assertEquals("Live nodes count does not match", 1, reader .getClusterState().getLiveNodes().size()); @@ -884,7 +884,7 @@ public class OverseerTest extends SolrTestCaseJ4 { waitForCollections(reader, "collection1"); - verifyStatus(reader, Replica.State.RECOVERING); + verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING); mockController.close(); @@ -1163,6 +1163,111 @@ public class OverseerTest extends SolrTestCaseJ4 { } } + @Test + public void testExternalClusterStateChangeBehavior() throws Exception { + String zkDir = createTempDir("testExternalClusterStateChangeBehavior").toFile().getAbsolutePath(); + + ZkTestServer server = new ZkTestServer(zkDir); + + SolrZkClient zkClient = null; + ZkStateReader reader = null; + SolrZkClient overseerClient = null; + + try { + server.run(); + zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT); + + AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); + AbstractZkTestCase.makeSolrZkNode(server.getZkHost()); + ZkController.createClusterZkNodes(zkClient); + + zkClient.create("/collections/test", null, CreateMode.PERSISTENT, true); + + reader = new ZkStateReader(zkClient); + reader.createClusterStateWatchersAndUpdate(); + + overseerClient = electNewOverseer(server.getZkAddress()); + + DistributedQueue q = Overseer.getInQueue(zkClient); + + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), + ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr", + ZkStateReader.NODE_NAME_PROP, "node1", + ZkStateReader.COLLECTION_PROP, "c1", + ZkStateReader.CORE_NAME_PROP, "core1", + ZkStateReader.ROLES_PROP, "", + ZkStateReader.STATE_PROP, Replica.State.DOWN.toString()); + + q.offer(Utils.toJSON(m)); + + waitForCollections(reader, "c1"); + verifyStatus(reader, "c1", "shard1", "core_node1", Replica.State.DOWN); + + m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), + ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr", + ZkStateReader.NODE_NAME_PROP, "node1", + ZkStateReader.COLLECTION_PROP, "c1", + ZkStateReader.CORE_NAME_PROP, "core1", + ZkStateReader.ROLES_PROP, "", + ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString()); + + q.offer(Utils.toJSON(m)); + + + m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), + ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr", + ZkStateReader.NODE_NAME_PROP, "node1", + ZkStateReader.COLLECTION_PROP, "c1", + ZkStateReader.CORE_NAME_PROP, "core1", + ZkStateReader.ROLES_PROP, "", + ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString()); + + q.offer(Utils.toJSON(m)); + + Stat stat = new Stat(); + byte[] data = zkClient.getData("/clusterstate.json", null, stat, true); + // Simulate an external modification + zkClient.setData("/clusterstate.json", data, true); + + m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), + "name", "test", + ZkStateReader.NUM_SHARDS_PROP, "1", + ZkStateReader.REPLICATION_FACTOR, "1", + DocCollection.STATE_FORMAT, "2" + ); + q.offer(Utils.toJSON(m)); + + m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATESHARD.toLower(), + "collection", "test", + ZkStateReader.SHARD_ID_PROP, "x", + ZkStateReader.REPLICATION_FACTOR, "1" + ); + q.offer(Utils.toJSON(m)); + + m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.ADDREPLICA.toLower(), + "collection", "test", + ZkStateReader.SHARD_ID_PROP, "x", + ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr", + ZkStateReader.NODE_NAME_PROP, "node1", + ZkStateReader.CORE_NAME_PROP, "core1", + ZkStateReader.STATE_PROP, Replica.State.DOWN.toString() + ); + q.offer(Utils.toJSON(m)); + + waitForCollections(reader, "test"); + verifyStatus(reader, "test", "x", "core_node1", Replica.State.DOWN); + + waitForCollections(reader, "c1"); + verifyStatus(reader, "c1", "shard1", "core_node1", Replica.State.ACTIVE); + + } finally { + close(zkClient); + close(overseerClient); + close(reader); + server.shutdown(); + } + } + private void close(ZkStateReader reader) { if (reader != null) { reader.close(); diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java index 48142c330bf..43f95dfb86b 100644 --- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java @@ -31,6 +31,7 @@ import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.Utils; +import org.apache.zookeeper.KeeperException; import java.util.HashMap; import java.util.Map; @@ -204,4 +205,160 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 { } + public void testExternalModificationToSharedClusterState() throws Exception { + String zkDir = createTempDir("testExternalModification").toFile().getAbsolutePath(); + + ZkTestServer server = new ZkTestServer(zkDir); + + SolrZkClient zkClient = null; + + try { + server.run(); + AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); + AbstractZkTestCase.makeSolrZkNode(server.getZkHost()); + + zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); + ZkController.createClusterZkNodes(zkClient); + + ZkStateReader reader = new ZkStateReader(zkClient); + reader.createClusterStateWatchersAndUpdate(); + + ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats()); + + zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); + zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true); + + // create collection 1 with stateFormat = 1 + ZkWriteCommand c1 = new ZkWriteCommand("c1", + new DocCollection("c1", new HashMap(), new HashMap(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE)); + writer.enqueueUpdate(reader.getClusterState(), c1, null); + writer.writePendingUpdates(); + + reader.updateClusterState(); + ClusterState clusterState = reader.getClusterState(); // keep a reference to the current cluster state object + assertTrue(clusterState.hasCollection("c1")); + assertFalse(clusterState.hasCollection("c2")); + + // Simulate an external modification to /clusterstate.json + byte[] data = zkClient.getData("/clusterstate.json", null, null, true); + zkClient.setData("/clusterstate.json", data, true); + + // enqueue another c1 so that ZkStateWriter has pending updates + writer.enqueueUpdate(clusterState, c1, null); + assertTrue(writer.hasPendingUpdates()); + + // create collection 2 with stateFormat = 1 + ZkWriteCommand c2 = new ZkWriteCommand("c2", + new DocCollection("c2", new HashMap(), new HashMap(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2"))); + + try { + writer.enqueueUpdate(clusterState, c2, null); // we are sending in the old cluster state object + fail("Enqueue should not have succeeded"); + } catch (KeeperException.BadVersionException bve) { + // expected + } + + reader.updateClusterState(); + try { + writer.enqueueUpdate(reader.getClusterState(), c2, null); + fail("enqueueUpdate after BadVersionException should not have suceeded"); + } catch (IllegalStateException e) { + // expected + } + + try { + writer.writePendingUpdates(); + fail("writePendingUpdates after BadVersionException should not have suceeded"); + } catch (IllegalStateException e) { + // expected + } + + } finally { + IOUtils.close(zkClient); + server.shutdown(); + } + } + + public void testExternalModificationToStateFormat2() throws Exception { + String zkDir = createTempDir("testExternalModificationToStateFormat2").toFile().getAbsolutePath(); + + ZkTestServer server = new ZkTestServer(zkDir); + + SolrZkClient zkClient = null; + + try { + server.run(); + AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); + AbstractZkTestCase.makeSolrZkNode(server.getZkHost()); + + zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); + ZkController.createClusterZkNodes(zkClient); + + ZkStateReader reader = new ZkStateReader(zkClient); + reader.createClusterStateWatchersAndUpdate(); + + ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats()); + + zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); + zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true); + + ClusterState state = reader.getClusterState(); + + // create collection 2 with stateFormat = 2 + ZkWriteCommand c2 = new ZkWriteCommand("c2", + new DocCollection("c2", new HashMap(), new HashMap(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2"))); + state = writer.enqueueUpdate(reader.getClusterState(), c2, null); + assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately + + int sharedClusterStateVersion = state.getZkClusterStateVersion(); + int stateFormat2Version = state.getCollection("c2").getZNodeVersion(); + + // Simulate an external modification to /collections/c2/state.json + byte[] data = zkClient.getData(ZkStateReader.getCollectionPath("c2"), null, null, true); + zkClient.setData(ZkStateReader.getCollectionPath("c2"), data, true); + + // get the most up-to-date state + reader.updateClusterState(); + state = reader.getClusterState(); + assertTrue(state.hasCollection("c2")); + assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion()); + assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion()); + + // enqueue an update to stateFormat2 collection such that update is pending + state = writer.enqueueUpdate(state, c2, null); + assertTrue(writer.hasPendingUpdates()); + + // get the most up-to-date state + reader.updateClusterState(); + state = reader.getClusterState(); + + // enqueue a stateFormat=1 collection which should cause a flush + ZkWriteCommand c1 = new ZkWriteCommand("c1", + new DocCollection("c1", new HashMap(), new HashMap(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE)); + + try { + state = writer.enqueueUpdate(state, c1, null); + fail("Enqueue should not have succeeded"); + } catch (KeeperException.BadVersionException bve) { + // expected + } + + try { + writer.enqueueUpdate(reader.getClusterState(), c2, null); + fail("enqueueUpdate after BadVersionException should not have suceeded"); + } catch (IllegalStateException e) { + // expected + } + + try { + writer.writePendingUpdates(); + fail("writePendingUpdates after BadVersionException should not have suceeded"); + } catch (IllegalStateException e) { + // expected + } + } finally { + IOUtils.close(zkClient); + server.shutdown(); + } + } }