mirror of https://github.com/apache/lucene.git
SOLR-9346: Always close ZkStateReader
This commit is contained in:
parent
22d24969f5
commit
90e9c76851
|
@ -16,6 +16,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.solr.hadoop;
|
package org.apache.solr.hadoop;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import com.google.common.io.Files;
|
import com.google.common.io.Files;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.solr.cloud.ZkController;
|
import org.apache.solr.cloud.ZkController;
|
||||||
|
@ -35,15 +44,6 @@ import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.lang.invoke.MethodHandles;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extracts SolrCloud information from ZooKeeper.
|
* Extracts SolrCloud information from ZooKeeper.
|
||||||
*/
|
*/
|
||||||
|
@ -78,8 +78,7 @@ final class ZooKeeperInspector {
|
||||||
}
|
}
|
||||||
SolrZkClient zkClient = getZkClient(zkHost);
|
SolrZkClient zkClient = getZkClient(zkHost);
|
||||||
|
|
||||||
try {
|
try (ZkStateReader zkStateReader = new ZkStateReader(zkClient)) {
|
||||||
ZkStateReader zkStateReader = new ZkStateReader(zkClient);
|
|
||||||
try {
|
try {
|
||||||
// first check for alias
|
// first check for alias
|
||||||
collection = checkForAlias(zkClient, collection);
|
collection = checkForAlias(zkClient, collection);
|
||||||
|
|
|
@ -16,6 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.solr.cloud;
|
package org.apache.solr.cloud;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
import org.apache.solr.client.solrj.SolrClient;
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
import org.apache.solr.common.SolrInputDocument;
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
|
@ -36,12 +42,6 @@ import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.lang.invoke.MethodHandles;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test split phase that occurs when a Collection API split call is made.
|
* Test split phase that occurs when a Collection API split call is made.
|
||||||
*/
|
*/
|
||||||
|
@ -254,6 +254,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
|
||||||
address.replaceAll("/", "_"));
|
address.replaceAll("/", "_"));
|
||||||
overseerElector.setup(ec);
|
overseerElector.setup(ec);
|
||||||
overseerElector.joinElection(ec, false);
|
overseerElector.joinElection(ec, false);
|
||||||
|
reader.close();
|
||||||
return zkClient;
|
return zkClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -118,6 +118,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
|
||||||
if (!zkClient.isClosed()) {
|
if (!zkClient.isClosed()) {
|
||||||
zkClient.close();
|
zkClient.close();
|
||||||
}
|
}
|
||||||
|
zkStateReader.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -130,6 +130,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName);
|
deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName);
|
||||||
|
zkStateReader.close();
|
||||||
zkClient.close();
|
zkClient.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,68 +57,69 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
|
||||||
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
|
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
|
||||||
ZkController.createClusterZkNodes(zkClient);
|
ZkController.createClusterZkNodes(zkClient);
|
||||||
|
|
||||||
ZkStateReader reader = new ZkStateReader(zkClient);
|
try (ZkStateReader reader = new ZkStateReader(zkClient)) {
|
||||||
reader.createClusterStateWatchersAndUpdate();
|
reader.createClusterStateWatchersAndUpdate();
|
||||||
|
|
||||||
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
|
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
|
||||||
|
|
||||||
assertFalse("Deletes can always be batched", writer.maybeFlushBefore(new ZkWriteCommand("xyz", null)));
|
assertFalse("Deletes can always be batched", writer.maybeFlushBefore(new ZkWriteCommand("xyz", null)));
|
||||||
assertFalse("Deletes can always be batched", writer.maybeFlushAfter(new ZkWriteCommand("xyz", null)));
|
assertFalse("Deletes can always be batched", writer.maybeFlushAfter(new ZkWriteCommand("xyz", null)));
|
||||||
|
|
||||||
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
|
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
|
||||||
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
|
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
|
||||||
|
|
||||||
// create new collection with stateFormat = 2
|
// create new collection with stateFormat = 2
|
||||||
ZkWriteCommand c1 = new ZkWriteCommand("c1",
|
ZkWriteCommand c1 = new ZkWriteCommand("c1",
|
||||||
new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1"));
|
new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1"));
|
||||||
assertFalse("First requests can always be batched", writer.maybeFlushBefore(c1));
|
assertFalse("First requests can always be batched", writer.maybeFlushBefore(c1));
|
||||||
|
|
||||||
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
|
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
|
||||||
|
|
||||||
ZkWriteCommand c2 = new ZkWriteCommand("c2",
|
ZkWriteCommand c2 = new ZkWriteCommand("c2",
|
||||||
new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2"));
|
new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2"));
|
||||||
assertTrue("Different (new) collection create cannot be batched together with another create", writer.maybeFlushBefore(c2));
|
assertTrue("Different (new) collection create cannot be batched together with another create", writer.maybeFlushBefore(c2));
|
||||||
|
|
||||||
// simulate three state changes on same collection, all should be batched together before
|
// simulate three state changes on same collection, all should be batched together before
|
||||||
assertFalse(writer.maybeFlushBefore(c1));
|
assertFalse(writer.maybeFlushBefore(c1));
|
||||||
assertFalse(writer.maybeFlushBefore(c1));
|
assertFalse(writer.maybeFlushBefore(c1));
|
||||||
assertFalse(writer.maybeFlushBefore(c1));
|
assertFalse(writer.maybeFlushBefore(c1));
|
||||||
// and after too
|
// and after too
|
||||||
assertFalse(writer.maybeFlushAfter(c1));
|
assertFalse(writer.maybeFlushAfter(c1));
|
||||||
assertFalse(writer.maybeFlushAfter(c1));
|
assertFalse(writer.maybeFlushAfter(c1));
|
||||||
assertFalse(writer.maybeFlushAfter(c1));
|
assertFalse(writer.maybeFlushAfter(c1));
|
||||||
|
|
||||||
// simulate three state changes on two different collections with stateFormat=2, none should be batched
|
// simulate three state changes on two different collections with stateFormat=2, none should be batched
|
||||||
assertFalse(writer.maybeFlushBefore(c1));
|
assertFalse(writer.maybeFlushBefore(c1));
|
||||||
// flushAfter has to be called as it updates the internal batching related info
|
// flushAfter has to be called as it updates the internal batching related info
|
||||||
assertFalse(writer.maybeFlushAfter(c1));
|
assertFalse(writer.maybeFlushAfter(c1));
|
||||||
assertTrue(writer.maybeFlushBefore(c2));
|
assertTrue(writer.maybeFlushBefore(c2));
|
||||||
assertFalse(writer.maybeFlushAfter(c2));
|
assertFalse(writer.maybeFlushAfter(c2));
|
||||||
assertTrue(writer.maybeFlushBefore(c1));
|
assertTrue(writer.maybeFlushBefore(c1));
|
||||||
assertFalse(writer.maybeFlushAfter(c1));
|
assertFalse(writer.maybeFlushAfter(c1));
|
||||||
|
|
||||||
// create a collection in stateFormat = 1 i.e. inside the main cluster state
|
// create a collection in stateFormat = 1 i.e. inside the main cluster state
|
||||||
ZkWriteCommand c3 = new ZkWriteCommand("c3",
|
ZkWriteCommand c3 = new ZkWriteCommand("c3",
|
||||||
new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
|
new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
|
||||||
clusterState = writer.enqueueUpdate(clusterState, c3, null);
|
clusterState = writer.enqueueUpdate(clusterState, c3, null);
|
||||||
|
|
||||||
// simulate three state changes in c3, all should be batched
|
// simulate three state changes in c3, all should be batched
|
||||||
for (int i=0; i<3; i++) {
|
for (int i = 0; i < 3; i++) {
|
||||||
|
assertFalse(writer.maybeFlushBefore(c3));
|
||||||
|
assertFalse(writer.maybeFlushAfter(c3));
|
||||||
|
}
|
||||||
|
|
||||||
|
// simulate state change in c3 (stateFormat=1) interleaved with state changes from c1,c2 (stateFormat=2)
|
||||||
|
// none should be batched together
|
||||||
assertFalse(writer.maybeFlushBefore(c3));
|
assertFalse(writer.maybeFlushBefore(c3));
|
||||||
assertFalse(writer.maybeFlushAfter(c3));
|
assertFalse(writer.maybeFlushAfter(c3));
|
||||||
|
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c1));
|
||||||
|
assertFalse(writer.maybeFlushAfter(c1));
|
||||||
|
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c3));
|
||||||
|
assertFalse(writer.maybeFlushAfter(c3));
|
||||||
|
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c2));
|
||||||
|
assertFalse(writer.maybeFlushAfter(c2));
|
||||||
}
|
}
|
||||||
|
|
||||||
// simulate state change in c3 (stateFormat=1) interleaved with state changes from c1,c2 (stateFormat=2)
|
|
||||||
// none should be batched together
|
|
||||||
assertFalse(writer.maybeFlushBefore(c3));
|
|
||||||
assertFalse(writer.maybeFlushAfter(c3));
|
|
||||||
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c1));
|
|
||||||
assertFalse(writer.maybeFlushAfter(c1));
|
|
||||||
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c3));
|
|
||||||
assertFalse(writer.maybeFlushAfter(c3));
|
|
||||||
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c2));
|
|
||||||
assertFalse(writer.maybeFlushAfter(c2));
|
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.close(zkClient);
|
IOUtils.close(zkClient);
|
||||||
server.shutdown();
|
server.shutdown();
|
||||||
|
@ -140,24 +141,25 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
|
||||||
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
|
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
|
||||||
ZkController.createClusterZkNodes(zkClient);
|
ZkController.createClusterZkNodes(zkClient);
|
||||||
|
|
||||||
ZkStateReader reader = new ZkStateReader(zkClient);
|
try (ZkStateReader reader = new ZkStateReader(zkClient)) {
|
||||||
reader.createClusterStateWatchersAndUpdate();
|
reader.createClusterStateWatchersAndUpdate();
|
||||||
|
|
||||||
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
|
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
|
||||||
|
|
||||||
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
|
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
|
||||||
|
|
||||||
// create new collection with stateFormat = 1
|
// create new collection with stateFormat = 1
|
||||||
ZkWriteCommand c1 = new ZkWriteCommand("c1",
|
ZkWriteCommand c1 = new ZkWriteCommand("c1",
|
||||||
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
|
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
|
||||||
|
|
||||||
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
|
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
|
||||||
writer.writePendingUpdates();
|
writer.writePendingUpdates();
|
||||||
|
|
||||||
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
|
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
|
||||||
assertNotNull(map.get("c1"));
|
assertNotNull(map.get("c1"));
|
||||||
boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
|
boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
|
||||||
assertFalse(exists);
|
assertFalse(exists);
|
||||||
|
}
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.close(zkClient);
|
IOUtils.close(zkClient);
|
||||||
|
@ -181,24 +183,25 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
|
||||||
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
|
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
|
||||||
ZkController.createClusterZkNodes(zkClient);
|
ZkController.createClusterZkNodes(zkClient);
|
||||||
|
|
||||||
ZkStateReader reader = new ZkStateReader(zkClient);
|
try (ZkStateReader reader = new ZkStateReader(zkClient)) {
|
||||||
reader.createClusterStateWatchersAndUpdate();
|
reader.createClusterStateWatchersAndUpdate();
|
||||||
|
|
||||||
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
|
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
|
||||||
|
|
||||||
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
|
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
|
||||||
|
|
||||||
// create new collection with stateFormat = 2
|
// create new collection with stateFormat = 2
|
||||||
ZkWriteCommand c1 = new ZkWriteCommand("c1",
|
ZkWriteCommand c1 = new ZkWriteCommand("c1",
|
||||||
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
|
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
|
||||||
|
|
||||||
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
|
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
|
||||||
writer.writePendingUpdates();
|
writer.writePendingUpdates();
|
||||||
|
|
||||||
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
|
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
|
||||||
assertNull(map.get("c1"));
|
assertNull(map.get("c1"));
|
||||||
map = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", null, null, true));
|
map = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", null, null, true));
|
||||||
assertNotNull(map.get("c1"));
|
assertNotNull(map.get("c1"));
|
||||||
|
}
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.close(zkClient);
|
IOUtils.close(zkClient);
|
||||||
|
@ -224,63 +227,64 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
|
||||||
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
|
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
|
||||||
ZkController.createClusterZkNodes(zkClient);
|
ZkController.createClusterZkNodes(zkClient);
|
||||||
|
|
||||||
ZkStateReader reader = new ZkStateReader(zkClient);
|
try (ZkStateReader reader = new ZkStateReader(zkClient)) {
|
||||||
reader.createClusterStateWatchersAndUpdate();
|
reader.createClusterStateWatchersAndUpdate();
|
||||||
|
|
||||||
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
|
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
|
||||||
|
|
||||||
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
|
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
|
||||||
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
|
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
|
||||||
|
|
||||||
// create collection 1 with stateFormat = 1
|
// create collection 1 with stateFormat = 1
|
||||||
ZkWriteCommand c1 = new ZkWriteCommand("c1",
|
ZkWriteCommand c1 = new ZkWriteCommand("c1",
|
||||||
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
|
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
|
||||||
writer.enqueueUpdate(reader.getClusterState(), c1, null);
|
writer.enqueueUpdate(reader.getClusterState(), c1, null);
|
||||||
writer.writePendingUpdates();
|
|
||||||
|
|
||||||
reader.forceUpdateCollection("c1");
|
|
||||||
reader.forceUpdateCollection("c2");
|
|
||||||
ClusterState clusterState = reader.getClusterState(); // keep a reference to the current cluster state object
|
|
||||||
assertTrue(clusterState.hasCollection("c1"));
|
|
||||||
assertFalse(clusterState.hasCollection("c2"));
|
|
||||||
|
|
||||||
// Simulate an external modification to /clusterstate.json
|
|
||||||
byte[] data = zkClient.getData("/clusterstate.json", null, null, true);
|
|
||||||
zkClient.setData("/clusterstate.json", data, true);
|
|
||||||
|
|
||||||
// enqueue another c1 so that ZkStateWriter has pending updates
|
|
||||||
writer.enqueueUpdate(clusterState, c1, null);
|
|
||||||
assertTrue(writer.hasPendingUpdates());
|
|
||||||
|
|
||||||
// create collection 2 with stateFormat = 1
|
|
||||||
ZkWriteCommand c2 = new ZkWriteCommand("c2",
|
|
||||||
new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
|
|
||||||
|
|
||||||
try {
|
|
||||||
writer.enqueueUpdate(clusterState, c2, null); // we are sending in the old cluster state object
|
|
||||||
fail("Enqueue should not have succeeded");
|
|
||||||
} catch (KeeperException.BadVersionException bve) {
|
|
||||||
// expected
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
writer.enqueueUpdate(reader.getClusterState(), c2, null);
|
|
||||||
fail("enqueueUpdate after BadVersionException should not have succeeded");
|
|
||||||
} catch (IllegalStateException e) {
|
|
||||||
// expected
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
writer.writePendingUpdates();
|
writer.writePendingUpdates();
|
||||||
fail("writePendingUpdates after BadVersionException should not have succeeded");
|
|
||||||
} catch (IllegalStateException e) {
|
|
||||||
// expected
|
|
||||||
}
|
|
||||||
|
|
||||||
|
reader.forceUpdateCollection("c1");
|
||||||
|
reader.forceUpdateCollection("c2");
|
||||||
|
ClusterState clusterState = reader.getClusterState(); // keep a reference to the current cluster state object
|
||||||
|
assertTrue(clusterState.hasCollection("c1"));
|
||||||
|
assertFalse(clusterState.hasCollection("c2"));
|
||||||
|
|
||||||
|
// Simulate an external modification to /clusterstate.json
|
||||||
|
byte[] data = zkClient.getData("/clusterstate.json", null, null, true);
|
||||||
|
zkClient.setData("/clusterstate.json", data, true);
|
||||||
|
|
||||||
|
// enqueue another c1 so that ZkStateWriter has pending updates
|
||||||
|
writer.enqueueUpdate(clusterState, c1, null);
|
||||||
|
assertTrue(writer.hasPendingUpdates());
|
||||||
|
|
||||||
|
// create collection 2 with stateFormat = 1
|
||||||
|
ZkWriteCommand c2 = new ZkWriteCommand("c2",
|
||||||
|
new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
|
||||||
|
|
||||||
|
try {
|
||||||
|
writer.enqueueUpdate(clusterState, c2, null); // we are sending in the old cluster state object
|
||||||
|
fail("Enqueue should not have succeeded");
|
||||||
|
} catch (KeeperException.BadVersionException bve) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
writer.enqueueUpdate(reader.getClusterState(), c2, null);
|
||||||
|
fail("enqueueUpdate after BadVersionException should not have succeeded");
|
||||||
|
} catch (IllegalStateException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
writer.writePendingUpdates();
|
||||||
|
fail("writePendingUpdates after BadVersionException should not have succeeded");
|
||||||
|
} catch (IllegalStateException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.close(zkClient);
|
IOUtils.close(zkClient);
|
||||||
server.shutdown();
|
server.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testExternalModificationToStateFormat2() throws Exception {
|
public void testExternalModificationToStateFormat2() throws Exception {
|
||||||
|
@ -298,68 +302,69 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
|
||||||
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
|
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
|
||||||
ZkController.createClusterZkNodes(zkClient);
|
ZkController.createClusterZkNodes(zkClient);
|
||||||
|
|
||||||
ZkStateReader reader = new ZkStateReader(zkClient);
|
try (ZkStateReader reader = new ZkStateReader(zkClient)) {
|
||||||
reader.createClusterStateWatchersAndUpdate();
|
reader.createClusterStateWatchersAndUpdate();
|
||||||
|
|
||||||
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
|
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
|
||||||
|
|
||||||
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
|
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
|
||||||
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
|
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
|
||||||
|
|
||||||
ClusterState state = reader.getClusterState();
|
ClusterState state = reader.getClusterState();
|
||||||
|
|
||||||
// create collection 2 with stateFormat = 2
|
// create collection 2 with stateFormat = 2
|
||||||
ZkWriteCommand c2 = new ZkWriteCommand("c2",
|
ZkWriteCommand c2 = new ZkWriteCommand("c2",
|
||||||
new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
|
new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
|
||||||
state = writer.enqueueUpdate(reader.getClusterState(), c2, null);
|
state = writer.enqueueUpdate(reader.getClusterState(), c2, null);
|
||||||
assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately
|
assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately
|
||||||
|
|
||||||
int sharedClusterStateVersion = state.getZkClusterStateVersion();
|
int sharedClusterStateVersion = state.getZkClusterStateVersion();
|
||||||
int stateFormat2Version = state.getCollection("c2").getZNodeVersion();
|
int stateFormat2Version = state.getCollection("c2").getZNodeVersion();
|
||||||
|
|
||||||
// Simulate an external modification to /collections/c2/state.json
|
// Simulate an external modification to /collections/c2/state.json
|
||||||
byte[] data = zkClient.getData(ZkStateReader.getCollectionPath("c2"), null, null, true);
|
byte[] data = zkClient.getData(ZkStateReader.getCollectionPath("c2"), null, null, true);
|
||||||
zkClient.setData(ZkStateReader.getCollectionPath("c2"), data, true);
|
zkClient.setData(ZkStateReader.getCollectionPath("c2"), data, true);
|
||||||
|
|
||||||
// get the most up-to-date state
|
// get the most up-to-date state
|
||||||
reader.forceUpdateCollection("c2");
|
reader.forceUpdateCollection("c2");
|
||||||
state = reader.getClusterState();
|
state = reader.getClusterState();
|
||||||
log.info("Cluster state: {}", state);
|
log.info("Cluster state: {}", state);
|
||||||
assertTrue(state.hasCollection("c2"));
|
assertTrue(state.hasCollection("c2"));
|
||||||
assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion());
|
assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion());
|
||||||
assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion());
|
assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion());
|
||||||
|
|
||||||
// enqueue an update to stateFormat2 collection such that update is pending
|
// enqueue an update to stateFormat2 collection such that update is pending
|
||||||
state = writer.enqueueUpdate(state, c2, null);
|
state = writer.enqueueUpdate(state, c2, null);
|
||||||
assertTrue(writer.hasPendingUpdates());
|
assertTrue(writer.hasPendingUpdates());
|
||||||
|
|
||||||
// get the most up-to-date state
|
// get the most up-to-date state
|
||||||
reader.forceUpdateCollection("c2");
|
reader.forceUpdateCollection("c2");
|
||||||
state = reader.getClusterState();
|
state = reader.getClusterState();
|
||||||
|
|
||||||
// enqueue a stateFormat=1 collection which should cause a flush
|
// enqueue a stateFormat=1 collection which should cause a flush
|
||||||
ZkWriteCommand c1 = new ZkWriteCommand("c1",
|
ZkWriteCommand c1 = new ZkWriteCommand("c1",
|
||||||
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
|
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
writer.enqueueUpdate(state, c1, null);
|
writer.enqueueUpdate(state, c1, null);
|
||||||
fail("Enqueue should not have succeeded");
|
fail("Enqueue should not have succeeded");
|
||||||
} catch (KeeperException.BadVersionException bve) {
|
} catch (KeeperException.BadVersionException bve) {
|
||||||
// expected
|
// expected
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
writer.enqueueUpdate(reader.getClusterState(), c2, null);
|
writer.enqueueUpdate(reader.getClusterState(), c2, null);
|
||||||
fail("enqueueUpdate after BadVersionException should not have succeeded");
|
fail("enqueueUpdate after BadVersionException should not have succeeded");
|
||||||
} catch (IllegalStateException e) {
|
} catch (IllegalStateException e) {
|
||||||
// expected
|
// expected
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
writer.writePendingUpdates();
|
writer.writePendingUpdates();
|
||||||
fail("writePendingUpdates after BadVersionException should not have succeeded");
|
fail("writePendingUpdates after BadVersionException should not have succeeded");
|
||||||
} catch (IllegalStateException e) {
|
} catch (IllegalStateException e) {
|
||||||
// expected
|
// expected
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.close(zkClient);
|
IOUtils.close(zkClient);
|
||||||
|
|
Loading…
Reference in New Issue