SOLR-9346: Always close ZkStateReader

This commit is contained in:
Alan Woodward 2016-07-27 16:45:59 +01:00
parent 219406d912
commit 78ebcd3cf5
5 changed files with 195 additions and 188 deletions

View File

@ -16,6 +16,15 @@
*/ */
package org.apache.solr.hadoop; package org.apache.solr.hadoop;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import com.google.common.io.Files; import com.google.common.io.Files;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.solr.cloud.ZkController; import org.apache.solr.cloud.ZkController;
@ -35,15 +44,6 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/** /**
* Extracts SolrCloud information from ZooKeeper. * Extracts SolrCloud information from ZooKeeper.
*/ */
@ -78,8 +78,7 @@ final class ZooKeeperInspector {
} }
SolrZkClient zkClient = getZkClient(zkHost); SolrZkClient zkClient = getZkClient(zkHost);
try { try (ZkStateReader zkStateReader = new ZkStateReader(zkClient)) {
ZkStateReader zkStateReader = new ZkStateReader(zkClient);
try { try {
// first check for alias // first check for alias
collection = checkForAlias(zkClient, collection); collection = checkForAlias(zkClient, collection);

View File

@ -16,6 +16,12 @@
*/ */
package org.apache.solr.cloud; package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
@ -36,12 +42,6 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Test split phase that occurs when a Collection API split call is made. * Test split phase that occurs when a Collection API split call is made.
*/ */
@ -254,6 +254,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
address.replaceAll("/", "_")); address.replaceAll("/", "_"));
overseerElector.setup(ec); overseerElector.setup(ec);
overseerElector.joinElection(ec, false); overseerElector.joinElection(ec, false);
reader.close();
return zkClient; return zkClient;
} }

View File

@ -118,6 +118,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
if (!zkClient.isClosed()) { if (!zkClient.isClosed()) {
zkClient.close(); zkClient.close();
} }
zkStateReader.close();
} }
} }

View File

@ -130,6 +130,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
} }
} }
deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName); deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName);
zkStateReader.close();
zkClient.close(); zkClient.close();
} }

View File

@ -57,68 +57,69 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient); ZkController.createClusterZkNodes(zkClient);
ZkStateReader reader = new ZkStateReader(zkClient); try (ZkStateReader reader = new ZkStateReader(zkClient)) {
reader.createClusterStateWatchersAndUpdate(); reader.createClusterStateWatchersAndUpdate();
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats()); ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
assertFalse("Deletes can always be batched", writer.maybeFlushBefore(new ZkWriteCommand("xyz", null))); assertFalse("Deletes can always be batched", writer.maybeFlushBefore(new ZkWriteCommand("xyz", null)));
assertFalse("Deletes can always be batched", writer.maybeFlushAfter(new ZkWriteCommand("xyz", null))); assertFalse("Deletes can always be batched", writer.maybeFlushAfter(new ZkWriteCommand("xyz", null)));
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true); zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
// create new collection with stateFormat = 2 // create new collection with stateFormat = 2
ZkWriteCommand c1 = new ZkWriteCommand("c1", ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1")); new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1"));
assertFalse("First requests can always be batched", writer.maybeFlushBefore(c1)); assertFalse("First requests can always be batched", writer.maybeFlushBefore(c1));
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null); ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
ZkWriteCommand c2 = new ZkWriteCommand("c2", ZkWriteCommand c2 = new ZkWriteCommand("c2",
new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2")); new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2"));
assertTrue("Different (new) collection create cannot be batched together with another create", writer.maybeFlushBefore(c2)); assertTrue("Different (new) collection create cannot be batched together with another create", writer.maybeFlushBefore(c2));
// simulate three state changes on same collection, all should be batched together before // simulate three state changes on same collection, all should be batched together before
assertFalse(writer.maybeFlushBefore(c1)); assertFalse(writer.maybeFlushBefore(c1));
assertFalse(writer.maybeFlushBefore(c1)); assertFalse(writer.maybeFlushBefore(c1));
assertFalse(writer.maybeFlushBefore(c1)); assertFalse(writer.maybeFlushBefore(c1));
// and after too // and after too
assertFalse(writer.maybeFlushAfter(c1)); assertFalse(writer.maybeFlushAfter(c1));
assertFalse(writer.maybeFlushAfter(c1)); assertFalse(writer.maybeFlushAfter(c1));
assertFalse(writer.maybeFlushAfter(c1)); assertFalse(writer.maybeFlushAfter(c1));
// simulate three state changes on two different collections with stateFormat=2, none should be batched // simulate three state changes on two different collections with stateFormat=2, none should be batched
assertFalse(writer.maybeFlushBefore(c1)); assertFalse(writer.maybeFlushBefore(c1));
// flushAfter has to be called as it updates the internal batching related info // flushAfter has to be called as it updates the internal batching related info
assertFalse(writer.maybeFlushAfter(c1)); assertFalse(writer.maybeFlushAfter(c1));
assertTrue(writer.maybeFlushBefore(c2)); assertTrue(writer.maybeFlushBefore(c2));
assertFalse(writer.maybeFlushAfter(c2)); assertFalse(writer.maybeFlushAfter(c2));
assertTrue(writer.maybeFlushBefore(c1)); assertTrue(writer.maybeFlushBefore(c1));
assertFalse(writer.maybeFlushAfter(c1)); assertFalse(writer.maybeFlushAfter(c1));
// create a collection in stateFormat = 1 i.e. inside the main cluster state // create a collection in stateFormat = 1 i.e. inside the main cluster state
ZkWriteCommand c3 = new ZkWriteCommand("c3", ZkWriteCommand c3 = new ZkWriteCommand("c3",
new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE)); new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
clusterState = writer.enqueueUpdate(clusterState, c3, null); clusterState = writer.enqueueUpdate(clusterState, c3, null);
// simulate three state changes in c3, all should be batched // simulate three state changes in c3, all should be batched
for (int i=0; i<3; i++) { for (int i = 0; i < 3; i++) {
assertFalse(writer.maybeFlushBefore(c3));
assertFalse(writer.maybeFlushAfter(c3));
}
// simulate state change in c3 (stateFormat=1) interleaved with state changes from c1,c2 (stateFormat=2)
// none should be batched together
assertFalse(writer.maybeFlushBefore(c3)); assertFalse(writer.maybeFlushBefore(c3));
assertFalse(writer.maybeFlushAfter(c3)); assertFalse(writer.maybeFlushAfter(c3));
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c1));
assertFalse(writer.maybeFlushAfter(c1));
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c3));
assertFalse(writer.maybeFlushAfter(c3));
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c2));
assertFalse(writer.maybeFlushAfter(c2));
} }
// simulate state change in c3 (stateFormat=1) interleaved with state changes from c1,c2 (stateFormat=2)
// none should be batched together
assertFalse(writer.maybeFlushBefore(c3));
assertFalse(writer.maybeFlushAfter(c3));
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c1));
assertFalse(writer.maybeFlushAfter(c1));
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c3));
assertFalse(writer.maybeFlushAfter(c3));
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c2));
assertFalse(writer.maybeFlushAfter(c2));
} finally { } finally {
IOUtils.close(zkClient); IOUtils.close(zkClient);
server.shutdown(); server.shutdown();
@ -140,24 +141,25 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient); ZkController.createClusterZkNodes(zkClient);
ZkStateReader reader = new ZkStateReader(zkClient); try (ZkStateReader reader = new ZkStateReader(zkClient)) {
reader.createClusterStateWatchersAndUpdate(); reader.createClusterStateWatchersAndUpdate();
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats()); ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
// create new collection with stateFormat = 1 // create new collection with stateFormat = 1
ZkWriteCommand c1 = new ZkWriteCommand("c1", ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE)); new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null); ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
writer.writePendingUpdates(); writer.writePendingUpdates();
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true)); Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
assertNotNull(map.get("c1")); assertNotNull(map.get("c1"));
boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true); boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
assertFalse(exists); assertFalse(exists);
}
} finally { } finally {
IOUtils.close(zkClient); IOUtils.close(zkClient);
@ -181,24 +183,25 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient); ZkController.createClusterZkNodes(zkClient);
ZkStateReader reader = new ZkStateReader(zkClient); try (ZkStateReader reader = new ZkStateReader(zkClient)) {
reader.createClusterStateWatchersAndUpdate(); reader.createClusterStateWatchersAndUpdate();
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats()); ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
// create new collection with stateFormat = 2 // create new collection with stateFormat = 2
ZkWriteCommand c1 = new ZkWriteCommand("c1", ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json")); new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null); ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
writer.writePendingUpdates(); writer.writePendingUpdates();
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true)); Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
assertNull(map.get("c1")); assertNull(map.get("c1"));
map = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", null, null, true)); map = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", null, null, true));
assertNotNull(map.get("c1")); assertNotNull(map.get("c1"));
}
} finally { } finally {
IOUtils.close(zkClient); IOUtils.close(zkClient);
@ -224,63 +227,64 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient); ZkController.createClusterZkNodes(zkClient);
ZkStateReader reader = new ZkStateReader(zkClient); try (ZkStateReader reader = new ZkStateReader(zkClient)) {
reader.createClusterStateWatchersAndUpdate(); reader.createClusterStateWatchersAndUpdate();
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats()); ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true); zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
// create collection 1 with stateFormat = 1 // create collection 1 with stateFormat = 1
ZkWriteCommand c1 = new ZkWriteCommand("c1", ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE)); new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
writer.enqueueUpdate(reader.getClusterState(), c1, null); writer.enqueueUpdate(reader.getClusterState(), c1, null);
writer.writePendingUpdates();
reader.forceUpdateCollection("c1");
reader.forceUpdateCollection("c2");
ClusterState clusterState = reader.getClusterState(); // keep a reference to the current cluster state object
assertTrue(clusterState.hasCollection("c1"));
assertFalse(clusterState.hasCollection("c2"));
// Simulate an external modification to /clusterstate.json
byte[] data = zkClient.getData("/clusterstate.json", null, null, true);
zkClient.setData("/clusterstate.json", data, true);
// enqueue another c1 so that ZkStateWriter has pending updates
writer.enqueueUpdate(clusterState, c1, null);
assertTrue(writer.hasPendingUpdates());
// create collection 2 with stateFormat = 1
ZkWriteCommand c2 = new ZkWriteCommand("c2",
new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
try {
writer.enqueueUpdate(clusterState, c2, null); // we are sending in the old cluster state object
fail("Enqueue should not have succeeded");
} catch (KeeperException.BadVersionException bve) {
// expected
}
try {
writer.enqueueUpdate(reader.getClusterState(), c2, null);
fail("enqueueUpdate after BadVersionException should not have succeeded");
} catch (IllegalStateException e) {
// expected
}
try {
writer.writePendingUpdates(); writer.writePendingUpdates();
fail("writePendingUpdates after BadVersionException should not have succeeded");
} catch (IllegalStateException e) {
// expected
}
reader.forceUpdateCollection("c1");
reader.forceUpdateCollection("c2");
ClusterState clusterState = reader.getClusterState(); // keep a reference to the current cluster state object
assertTrue(clusterState.hasCollection("c1"));
assertFalse(clusterState.hasCollection("c2"));
// Simulate an external modification to /clusterstate.json
byte[] data = zkClient.getData("/clusterstate.json", null, null, true);
zkClient.setData("/clusterstate.json", data, true);
// enqueue another c1 so that ZkStateWriter has pending updates
writer.enqueueUpdate(clusterState, c1, null);
assertTrue(writer.hasPendingUpdates());
// create collection 2 with stateFormat = 1
ZkWriteCommand c2 = new ZkWriteCommand("c2",
new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
try {
writer.enqueueUpdate(clusterState, c2, null); // we are sending in the old cluster state object
fail("Enqueue should not have succeeded");
} catch (KeeperException.BadVersionException bve) {
// expected
}
try {
writer.enqueueUpdate(reader.getClusterState(), c2, null);
fail("enqueueUpdate after BadVersionException should not have succeeded");
} catch (IllegalStateException e) {
// expected
}
try {
writer.writePendingUpdates();
fail("writePendingUpdates after BadVersionException should not have succeeded");
} catch (IllegalStateException e) {
// expected
}
}
} finally { } finally {
IOUtils.close(zkClient); IOUtils.close(zkClient);
server.shutdown(); server.shutdown();
} }
} }
public void testExternalModificationToStateFormat2() throws Exception { public void testExternalModificationToStateFormat2() throws Exception {
@ -298,68 +302,69 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient); ZkController.createClusterZkNodes(zkClient);
ZkStateReader reader = new ZkStateReader(zkClient); try (ZkStateReader reader = new ZkStateReader(zkClient)) {
reader.createClusterStateWatchersAndUpdate(); reader.createClusterStateWatchersAndUpdate();
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats()); ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true); zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
ClusterState state = reader.getClusterState(); ClusterState state = reader.getClusterState();
// create collection 2 with stateFormat = 2 // create collection 2 with stateFormat = 2
ZkWriteCommand c2 = new ZkWriteCommand("c2", ZkWriteCommand c2 = new ZkWriteCommand("c2",
new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2"))); new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
state = writer.enqueueUpdate(reader.getClusterState(), c2, null); state = writer.enqueueUpdate(reader.getClusterState(), c2, null);
assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately
int sharedClusterStateVersion = state.getZkClusterStateVersion(); int sharedClusterStateVersion = state.getZkClusterStateVersion();
int stateFormat2Version = state.getCollection("c2").getZNodeVersion(); int stateFormat2Version = state.getCollection("c2").getZNodeVersion();
// Simulate an external modification to /collections/c2/state.json // Simulate an external modification to /collections/c2/state.json
byte[] data = zkClient.getData(ZkStateReader.getCollectionPath("c2"), null, null, true); byte[] data = zkClient.getData(ZkStateReader.getCollectionPath("c2"), null, null, true);
zkClient.setData(ZkStateReader.getCollectionPath("c2"), data, true); zkClient.setData(ZkStateReader.getCollectionPath("c2"), data, true);
// get the most up-to-date state // get the most up-to-date state
reader.forceUpdateCollection("c2"); reader.forceUpdateCollection("c2");
state = reader.getClusterState(); state = reader.getClusterState();
log.info("Cluster state: {}", state); log.info("Cluster state: {}", state);
assertTrue(state.hasCollection("c2")); assertTrue(state.hasCollection("c2"));
assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion()); assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion());
assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion()); assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion());
// enqueue an update to stateFormat2 collection such that update is pending // enqueue an update to stateFormat2 collection such that update is pending
state = writer.enqueueUpdate(state, c2, null); state = writer.enqueueUpdate(state, c2, null);
assertTrue(writer.hasPendingUpdates()); assertTrue(writer.hasPendingUpdates());
// get the most up-to-date state // get the most up-to-date state
reader.forceUpdateCollection("c2"); reader.forceUpdateCollection("c2");
state = reader.getClusterState(); state = reader.getClusterState();
// enqueue a stateFormat=1 collection which should cause a flush // enqueue a stateFormat=1 collection which should cause a flush
ZkWriteCommand c1 = new ZkWriteCommand("c1", ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE)); new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
try { try {
writer.enqueueUpdate(state, c1, null); writer.enqueueUpdate(state, c1, null);
fail("Enqueue should not have succeeded"); fail("Enqueue should not have succeeded");
} catch (KeeperException.BadVersionException bve) { } catch (KeeperException.BadVersionException bve) {
// expected // expected
} }
try { try {
writer.enqueueUpdate(reader.getClusterState(), c2, null); writer.enqueueUpdate(reader.getClusterState(), c2, null);
fail("enqueueUpdate after BadVersionException should not have succeeded"); fail("enqueueUpdate after BadVersionException should not have succeeded");
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
// expected // expected
} }
try { try {
writer.writePendingUpdates(); writer.writePendingUpdates();
fail("writePendingUpdates after BadVersionException should not have succeeded"); fail("writePendingUpdates after BadVersionException should not have succeeded");
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
// expected // expected
}
} }
} finally { } finally {
IOUtils.close(zkClient); IOUtils.close(zkClient);