Moved RecoverAfterNodesTests to inherit from AbstractIntegrationTest

This commit is contained in:
Boaz Leskes 2013-09-24 14:32:41 +02:00
parent 0422f75c8d
commit 10de3a7ecb
2 changed files with 122 additions and 96 deletions

View File

@ -19,11 +19,16 @@
package org.elasticsearch.gateway.none; package org.elasticsearch.gateway.none;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.test.AbstractNodesTests; import org.elasticsearch.test.AbstractIntegrationTest;
import org.junit.After; import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.junit.Test; import org.junit.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
@ -33,114 +38,124 @@ import static org.hamcrest.Matchers.hasItem;
/** /**
* *
*/ */
public class RecoverAfterNodesTests extends AbstractNodesTests { @ClusterScope(scope = Scope.TEST, numNodes = 0)
public class RecoverAfterNodesTests extends AbstractIntegrationTest {
private final static TimeValue BLOCK_WAIT_TIMEOUT = TimeValue.timeValueSeconds(1); private final static TimeValue BLOCK_WAIT_TIMEOUT = TimeValue.timeValueSeconds(1);
@After public ImmutableSet<ClusterBlock> waitForNoBlocksOnNode(TimeValue timeout, Client nodeClient) throws InterruptedException {
public void closeNodes() throws Exception { long start = System.currentTimeMillis();
tearDown(); ImmutableSet<ClusterBlock> blocks;
closeAllNodes(); do {
blocks = nodeClient.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA);
}
while (!blocks.isEmpty() && (System.currentTimeMillis() - start) < timeout.millis());
return blocks;
}
public Client startNode(Settings.Builder settings) {
String name = cluster().startNode(settings);
return cluster().clientNodeClient(name);
} }
@Test @Test
public void testRecoverAfterNodes() throws Exception { public void testRecoverAfterNodes() throws Exception {
logger.info("--> start node (1)"); logger.info("--> start node (1)");
startNode("node1", settingsBuilder().put("gateway.recover_after_nodes", 3)); Client clientNode1 = startNode(settingsBuilder().put("gateway.recover_after_nodes", 3));
assertThat(client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(clientNode1.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
logger.info("--> start node (2)"); logger.info("--> start node (2)");
startNode("node2", settingsBuilder().put("gateway.recover_after_nodes", 3)); Client clientNode2 = startNode(settingsBuilder().put("gateway.recover_after_nodes", 3));
// Sleeping here for the same time that we wait to check for empty blocks
Thread.sleep(BLOCK_WAIT_TIMEOUT.millis()); Thread.sleep(BLOCK_WAIT_TIMEOUT.millis());
assertThat(client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(clientNode1.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
assertThat(client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(clientNode2.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
logger.info("--> start node (3)"); logger.info("--> start node (3)");
startNode("node3", settingsBuilder().put("gateway.recover_after_nodes", 3)); Client clientNode3 = startNode(settingsBuilder().put("gateway.recover_after_nodes", 3));
assertThat(waitForNoBlocks(BLOCK_WAIT_TIMEOUT, "node1").isEmpty(), equalTo(true)); assertThat(waitForNoBlocksOnNode(BLOCK_WAIT_TIMEOUT, clientNode1).isEmpty(), equalTo(true));
assertThat(waitForNoBlocks(BLOCK_WAIT_TIMEOUT, "node2").isEmpty(), equalTo(true)); assertThat(waitForNoBlocksOnNode(BLOCK_WAIT_TIMEOUT, clientNode2).isEmpty(), equalTo(true));
assertThat(waitForNoBlocks(BLOCK_WAIT_TIMEOUT, "node3").isEmpty(), equalTo(true)); assertThat(waitForNoBlocksOnNode(BLOCK_WAIT_TIMEOUT, clientNode3).isEmpty(), equalTo(true));
} }
@Test @Test
public void testRecoverAfterMasterNodes() throws Exception { public void testRecoverAfterMasterNodes() throws Exception {
logger.info("--> start master_node (1)"); logger.info("--> start master_node (1)");
startNode("master1", settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", false).put("node.master", true)); Client master1 = startNode(settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", false).put("node.master", true));
assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(master1.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
logger.info("--> start data_node (1)"); logger.info("--> start data_node (1)");
startNode("data1", settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", true).put("node.master", false)); Client data1 = startNode(settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", true).put("node.master", false));
assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(master1.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(data1.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
logger.info("--> start data_node (2)"); logger.info("--> start data_node (2)");
startNode("data2", settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", true).put("node.master", false)); Client data2 = startNode(settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", true).put("node.master", false));
assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(master1.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(data1.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
assertThat(client("data2").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(data2.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
logger.info("--> start master_node (2)"); logger.info("--> start master_node (2)");
startNode("master2", settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", false).put("node.master", true)); Client master2 = startNode(settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", false).put("node.master", true));
assertThat(waitForNoBlocks(BLOCK_WAIT_TIMEOUT, "master1").isEmpty(), equalTo(true)); assertThat(waitForNoBlocksOnNode(BLOCK_WAIT_TIMEOUT, master1).isEmpty(), equalTo(true));
assertThat(waitForNoBlocks(BLOCK_WAIT_TIMEOUT, "master2").isEmpty(), equalTo(true)); assertThat(waitForNoBlocksOnNode(BLOCK_WAIT_TIMEOUT, master2).isEmpty(), equalTo(true));
assertThat(waitForNoBlocks(BLOCK_WAIT_TIMEOUT, "data1").isEmpty(), equalTo(true)); assertThat(waitForNoBlocksOnNode(BLOCK_WAIT_TIMEOUT, data1).isEmpty(), equalTo(true));
assertThat(waitForNoBlocks(BLOCK_WAIT_TIMEOUT, "data2").isEmpty(), equalTo(true)); assertThat(waitForNoBlocksOnNode(BLOCK_WAIT_TIMEOUT, data2).isEmpty(), equalTo(true));
} }
@Test @Test
public void testRecoverAfterDataNodes() throws Exception { public void testRecoverAfterDataNodes() throws Exception {
logger.info("--> start master_node (1)"); logger.info("--> start master_node (1)");
startNode("master1", settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", false).put("node.master", true)); Client master1 = startNode(settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", false).put("node.master", true));
assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(master1.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
logger.info("--> start data_node (1)"); logger.info("--> start data_node (1)");
startNode("data1", settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", true).put("node.master", false)); Client data1 = startNode(settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", true).put("node.master", false));
assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(master1.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(data1.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
logger.info("--> start master_node (2)"); logger.info("--> start master_node (2)");
startNode("master2", settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", false).put("node.master", true)); Client master2 = startNode(settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", false).put("node.master", true));
assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(master2.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(data1.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
assertThat(client("master2").admin().cluster().prepareState().setLocal(true).execute().actionGet() assertThat(master2.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA), .getState().blocks().global(ClusterBlockLevel.METADATA),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)); hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
logger.info("--> start data_node (2)"); logger.info("--> start data_node (2)");
startNode("data2", settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", true).put("node.master", false)); Client data2 = startNode(settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", true).put("node.master", false));
assertThat(waitForNoBlocks(BLOCK_WAIT_TIMEOUT, "master1").isEmpty(), equalTo(true)); assertThat(waitForNoBlocksOnNode(BLOCK_WAIT_TIMEOUT, master1).isEmpty(), equalTo(true));
assertThat(waitForNoBlocks(BLOCK_WAIT_TIMEOUT, "master2").isEmpty(), equalTo(true)); assertThat(waitForNoBlocksOnNode(BLOCK_WAIT_TIMEOUT, master2).isEmpty(), equalTo(true));
assertThat(waitForNoBlocks(BLOCK_WAIT_TIMEOUT, "data1").isEmpty(), equalTo(true)); assertThat(waitForNoBlocksOnNode(BLOCK_WAIT_TIMEOUT, data1).isEmpty(), equalTo(true));
assertThat(waitForNoBlocks(BLOCK_WAIT_TIMEOUT, "data2").isEmpty(), equalTo(true)); assertThat(waitForNoBlocksOnNode(BLOCK_WAIT_TIMEOUT, data2).isEmpty(), equalTo(true));
} }
} }

View File

@ -70,8 +70,8 @@ public class TestCluster implements Closeable, Iterable<Client> {
protected final ESLogger logger = Loggers.getLogger(getClass()); protected final ESLogger logger = Loggers.getLogger(getClass());
/* sorted map to make traverse order reproducible */ /* sorted map to make traverse order reproducible */
private final TreeMap<String, NodeAndClient> nodes = newTreeMap(); private final TreeMap<String, NodeAndClient> nodes = newTreeMap();
private final Set<File> dataDirToClean = new HashSet<File>(); private final Set<File> dataDirToClean = new HashSet<File>();
private final String clusterName; private final String clusterName;
@ -81,21 +81,22 @@ public class TestCluster implements Closeable, Iterable<Client> {
private final Settings defaultSettings; private final Settings defaultSettings;
private Random random; private Random random;
private AtomicInteger nextNodeId = new AtomicInteger(0); private AtomicInteger nextNodeId = new AtomicInteger(0);
/* We have a fixed number of shared nodes that we keep around across tests */ /* We have a fixed number of shared nodes that we keep around across tests */
private final int numSharedNodes; private final int numSharedNodes;
/* Each shared node has a node seed that is used to start up the node and get default settings /* Each shared node has a node seed that is used to start up the node and get default settings
* this is important if a node is randomly shut down in a test since the next test relies on a * this is important if a node is randomly shut down in a test since the next test relies on a
* fully shared cluster to be more reproducible */ * fully shared cluster to be more reproducible */
private final long[] sharedNodesSeeds; private final long[] sharedNodesSeeds;
private double transportClientRatio = 0.0; private double transportClientRatio = 0.0;
private final Map<Integer, Settings> perNodeSettingsMap; private final Map<Integer, Settings> perNodeSettingsMap;
private static final Map<Integer, Settings> EMPTY = Collections.emptyMap(); private static final Map<Integer, Settings> EMPTY = Collections.emptyMap();
public TestCluster(long clusterSeed, String clusterName) { public TestCluster(long clusterSeed, String clusterName) {
this(clusterSeed, -1, clusterName, EMPTY); this(clusterSeed, -1, clusterName, EMPTY);
} }
@ -116,21 +117,21 @@ public class TestCluster implements Closeable, Iterable<Client> {
for (int i = 0; i < sharedNodesSeeds.length; i++) { for (int i = 0; i < sharedNodesSeeds.length; i++) {
sharedNodesSeeds[i] = random.nextLong(); sharedNodesSeeds[i] = random.nextLong();
} }
logger.info("Setup TestCluster [{}] with seed [{}] using [{}] nodes" , clusterName, SeedUtils.formatSeed(clusterSeed), numSharedNodes); logger.info("Setup TestCluster [{}] with seed [{}] using [{}] nodes", clusterName, SeedUtils.formatSeed(clusterSeed), numSharedNodes);
this.defaultSettings = ImmutableSettings.settingsBuilder() this.defaultSettings = ImmutableSettings.settingsBuilder()
/* use RAM directories in 10% of the runs */ /* use RAM directories in 10% of the runs */
// .put("index.store.type", random.nextInt(10) == 0 ? MockRamIndexStoreModule.class.getName() : MockFSIndexStoreModule.class.getName()) // .put("index.store.type", random.nextInt(10) == 0 ? MockRamIndexStoreModule.class.getName() : MockFSIndexStoreModule.class.getName())
.put("index.store.type", MockFSIndexStoreModule.class.getName()) // no RAM dir for now! .put("index.store.type", MockFSIndexStoreModule.class.getName()) // no RAM dir for now!
.put(IndexEngineModule.EngineSettings.ENGINE_TYPE, MockEngineModule.class.getName()) .put(IndexEngineModule.EngineSettings.ENGINE_TYPE, MockEngineModule.class.getName())
.put("cluster.name", clusterName) .put("cluster.name", clusterName)
// decrease the routing schedule so new nodes will be added quickly - some random value between 30 and 80 ms // decrease the routing schedule so new nodes will be added quickly - some random value between 30 and 80 ms
.put("cluster.routing.schedule", (30 + random.nextInt(50)) + "ms") .put("cluster.routing.schedule", (30 + random.nextInt(50)) + "ms")
// default to non gateway // default to non gateway
.put("gateway.type", "none") .put("gateway.type", "none")
.build(); .build();
this.perNodeSettingsMap = perNodeSettings; this.perNodeSettingsMap = perNodeSettings;
} }
private Settings getSettings(int nodeOrdinal, Settings others) { private Settings getSettings(int nodeOrdinal, Settings others) {
Builder builder = ImmutableSettings.settingsBuilder().put(defaultSettings); Builder builder = ImmutableSettings.settingsBuilder().put(defaultSettings);
Settings settings = perNodeSettingsMap.get(nodeOrdinal); Settings settings = perNodeSettingsMap.get(nodeOrdinal);
@ -142,7 +143,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
} }
return builder.build(); return builder.build();
} }
public static String clusterName(String prefix, String childVMId, long clusterSeed) { public static String clusterName(String prefix, String childVMId, long clusterSeed) {
StringBuilder builder = new StringBuilder(prefix); StringBuilder builder = new StringBuilder(prefix);
builder.append('-').append(NetworkUtils.getLocalAddress().getHostName()); builder.append('-').append(NetworkUtils.getLocalAddress().getHostName());
@ -170,16 +171,16 @@ public class TestCluster implements Closeable, Iterable<Client> {
publishNode(buildNode); publishNode(buildNode);
return buildNode; return buildNode;
} }
private synchronized NodeAndClient getRandomNodeAndClient() { private synchronized NodeAndClient getRandomNodeAndClient() {
Predicate<NodeAndClient> all = Predicates.alwaysTrue(); Predicate<NodeAndClient> all = Predicates.alwaysTrue();
return getRandomNodeAndClient(all); return getRandomNodeAndClient(all);
} }
private synchronized NodeAndClient getRandomNodeAndClient(Predicate<NodeAndClient> predicate) { private synchronized NodeAndClient getRandomNodeAndClient(Predicate<NodeAndClient> predicate) {
ensureOpen(); ensureOpen();
Collection<NodeAndClient> values = Collections2.filter(nodes.values(), predicate) ; Collection<NodeAndClient> values = Collections2.filter(nodes.values(), predicate);
if (!values.isEmpty()) { if (!values.isEmpty()) {
int whichOne = random.nextInt(values.size()); int whichOne = random.nextInt(values.size());
for (NodeAndClient nodeAndClient : values) { for (NodeAndClient nodeAndClient : values) {
@ -190,13 +191,13 @@ public class TestCluster implements Closeable, Iterable<Client> {
} }
return null; return null;
} }
public synchronized void ensureAtLeastNumNodes(int num) { public synchronized void ensureAtLeastNumNodes(int num) {
int size = nodes.size(); int size = nodes.size();
for (int i = size; i < num; i++) { for (int i = size; i < num; i++) {
logger.info("increasing cluster size from {} to {}", size, num); logger.info("increasing cluster size from {} to {}", size, num);
NodeAndClient buildNode = buildNode(); NodeAndClient buildNode = buildNode();
buildNode.node().start(); buildNode.node().start();
publishNode(buildNode); publishNode(buildNode);
} }
} }
@ -214,11 +215,12 @@ public class TestCluster implements Closeable, Iterable<Client> {
next.close(); next.close();
} }
} }
private NodeAndClient buildNode(Settings settings) { private NodeAndClient buildNode(Settings settings) {
int ord = nextNodeId.getAndIncrement(); int ord = nextNodeId.getAndIncrement();
return buildNode(ord, random.nextLong(), settings); return buildNode(ord, random.nextLong(), settings);
} }
private NodeAndClient buildNode() { private NodeAndClient buildNode() {
int ord = nextNodeId.getAndIncrement(); int ord = nextNodeId.getAndIncrement();
return buildNode(ord, random.nextLong(), null); return buildNode(ord, random.nextLong(), null);
@ -247,8 +249,8 @@ public class TestCluster implements Closeable, Iterable<Client> {
/* Randomly return a client to one of the nodes in the cluster */ /* Randomly return a client to one of the nodes in the cluster */
return getOrBuildRandomNode().client(random); return getOrBuildRandomNode().client(random);
} }
public synchronized Client masterClient() { public synchronized Client masterClient() {
ensureOpen(); ensureOpen();
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new MasterNodePredicate(getMasterName())); NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new MasterNodePredicate(getMasterName()));
if (randomNodeAndClient != null) { if (randomNodeAndClient != null) {
@ -257,8 +259,8 @@ public class TestCluster implements Closeable, Iterable<Client> {
Assert.fail("No master client found"); Assert.fail("No master client found");
return null; // can't happen return null; // can't happen
} }
public synchronized Client nonMasterClient() { public synchronized Client nonMasterClient() {
ensureOpen(); ensureOpen();
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName()))); NodeAndClient randomNodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName())));
if (randomNodeAndClient != null) { if (randomNodeAndClient != null) {
@ -267,7 +269,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
Assert.fail("No non-master client found"); Assert.fail("No non-master client found");
return null; // can't happen return null; // can't happen
} }
public synchronized Client clientNodeClient() { public synchronized Client clientNodeClient() {
ensureOpen(); ensureOpen();
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new ClientNodePredicate()); NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new ClientNodePredicate());
@ -277,7 +279,16 @@ public class TestCluster implements Closeable, Iterable<Client> {
startNodeClient(ImmutableSettings.EMPTY); startNodeClient(ImmutableSettings.EMPTY);
return getRandomNodeAndClient(new ClientNodePredicate()).client(random); return getRandomNodeAndClient(new ClientNodePredicate()).client(random);
} }
public synchronized Client clientNodeClient(String nodeName) {
ensureOpen();
NodeAndClient randomNodeAndClient = nodes.get(nodeName);
if (randomNodeAndClient != null) {
return randomNodeAndClient.client(random);
}
return null;
}
public synchronized Client smartClient() { public synchronized Client smartClient() {
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(); NodeAndClient randomNodeAndClient = getRandomNodeAndClient();
if (randomNodeAndClient != null) { if (randomNodeAndClient != null) {
@ -286,10 +297,10 @@ public class TestCluster implements Closeable, Iterable<Client> {
Assert.fail("No smart client found"); Assert.fail("No smart client found");
return null; // can't happen return null; // can't happen
} }
public synchronized Client client(final Predicate<Settings> filterPredicate) { public synchronized Client client(final Predicate<Settings> filterPredicate) {
ensureOpen(); ensureOpen();
final NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new Predicate<NodeAndClient>() { final NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new Predicate<NodeAndClient>() {
@Override @Override
public boolean apply(NodeAndClient nodeAndClient) { public boolean apply(NodeAndClient nodeAndClient) {
return filterPredicate.apply(nodeAndClient.node.settings()); return filterPredicate.apply(nodeAndClient.node.settings());
@ -350,7 +361,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
} }
return client = clientFactory.client(node, clusterName, random); return client = clientFactory.client(node, clusterName, random);
} }
Client nodeClient() { Client nodeClient() {
if (closed.get()) { if (closed.get()) {
throw new RuntimeException("already closed"); throw new RuntimeException("already closed");
@ -360,7 +371,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
} }
return nodeClient; return nodeClient;
} }
void resetClient() { void resetClient() {
if (closed.get()) { if (closed.get()) {
throw new RuntimeException("already closed"); throw new RuntimeException("already closed");
@ -408,7 +419,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
private boolean sniff; private boolean sniff;
public static TransportClientFactory NO_SNIFF_CLIENT_FACTORY = new TransportClientFactory(false); public static TransportClientFactory NO_SNIFF_CLIENT_FACTORY = new TransportClientFactory(false);
public static TransportClientFactory SNIFF_CLIENT_FACTORY = new TransportClientFactory(true); public static TransportClientFactory SNIFF_CLIENT_FACTORY = new TransportClientFactory(true);
public TransportClientFactory(boolean sniff) { public TransportClientFactory(boolean sniff) {
this.sniff = sniff; this.sniff = sniff;
@ -428,7 +439,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
public class RandomClientFactory extends ClientFactory { public class RandomClientFactory extends ClientFactory {
@Override @Override
public Client client(Node node, String clusterName, Random random) { public Client client(Node node, String clusterName, Random random) {
double nextDouble = random.nextDouble(); double nextDouble = random.nextDouble();
if (nextDouble < transportClientRatio) { if (nextDouble < transportClientRatio) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
@ -443,7 +454,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
} }
} }
} }
public synchronized void beforeTest(Random random, double transportClientRatio) { public synchronized void beforeTest(Random random, double transportClientRatio) {
reset(random, true, transportClientRatio); reset(random, true, transportClientRatio);
} }
@ -463,7 +474,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
} }
logger.debug("Cluster is NOT consistent - restarting shared nodes - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), sharedNodesSeeds.length); logger.debug("Cluster is NOT consistent - restarting shared nodes - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), sharedNodesSeeds.length);
Set<NodeAndClient> sharedNodes = new HashSet<NodeAndClient>(); Set<NodeAndClient> sharedNodes = new HashSet<NodeAndClient>();
boolean changed = false; boolean changed = false;
for (int i = 0; i < sharedNodesSeeds.length; i++) { for (int i = 0; i < sharedNodesSeeds.length; i++) {
@ -487,7 +498,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
for (NodeAndClient nodeAndClient : sharedNodes) { for (NodeAndClient nodeAndClient : sharedNodes) {
nodes.remove(nodeAndClient.name); nodes.remove(nodeAndClient.name);
} }
// trash the remaining nodes // trash the remaining nodes
final Collection<NodeAndClient> toShutDown = nodes.values(); final Collection<NodeAndClient> toShutDown = nodes.values();
for (NodeAndClient nodeAndClient : toShutDown) { for (NodeAndClient nodeAndClient : toShutDown) {
@ -500,25 +511,25 @@ public class TestCluster implements Closeable, Iterable<Client> {
} }
nextNodeId.set(sharedNodesSeeds.length); nextNodeId.set(sharedNodesSeeds.length);
assert numNodes() == sharedNodesSeeds.length; assert numNodes() == sharedNodesSeeds.length;
if (numNodes() > 0) { if (numNodes() > 0) {
client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(sharedNodesSeeds.length)).get(); client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(sharedNodesSeeds.length)).get();
} }
logger.debug("Cluster is consistent again - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), sharedNodesSeeds.length); logger.debug("Cluster is consistent again - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), sharedNodesSeeds.length);
} }
public synchronized void afterTest() { public synchronized void afterTest() {
wipeDataDirectories(); wipeDataDirectories();
resetClients(); /* reset all clients - each test gets it's own client based on the Random instance created above. */ resetClients(); /* reset all clients - each test gets it's own client based on the Random instance created above. */
} }
private void resetClients() { private void resetClients() {
final Collection<NodeAndClient> nodesAndClients = nodes.values(); final Collection<NodeAndClient> nodesAndClients = nodes.values();
for (NodeAndClient nodeAndClient : nodesAndClients) { for (NodeAndClient nodeAndClient : nodesAndClients) {
nodeAndClient.resetClient(); nodeAndClient.resetClient();
} }
} }
private void wipeDataDirectories() { private void wipeDataDirectories() {
if (!dataDirToClean.isEmpty()) { if (!dataDirToClean.isEmpty()) {
logger.info("Wipe data directory for all nodes locations: {}", this.dataDirToClean); logger.info("Wipe data directory for all nodes locations: {}", this.dataDirToClean);
@ -557,11 +568,11 @@ public class TestCluster implements Closeable, Iterable<Client> {
assert randomNodeAndClient != null; assert randomNodeAndClient != null;
return getInstanceFromNode(clazz, randomNodeAndClient.node); return getInstanceFromNode(clazz, randomNodeAndClient.node);
} }
public synchronized <T> T getInstance(Class<T> clazz) { public synchronized <T> T getInstance(Class<T> clazz) {
return getInstance(clazz, null); return getInstance(clazz, null);
} }
private synchronized <T> T getInstanceFromNode(Class<T> clazz, InternalNode node) { private synchronized <T> T getInstanceFromNode(Class<T> clazz, InternalNode node) {
return node.injector().getInstance(clazz); return node.injector().getInstance(clazz);
} }
@ -579,7 +590,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
nodeAndClient.close(); nodeAndClient.close();
} }
} }
public synchronized void stopRandomNode(final Predicate<Settings> filter) { public synchronized void stopRandomNode(final Predicate<Settings> filter) {
ensureOpen(); ensureOpen();
NodeAndClient nodeAndClient = getRandomNodeAndClient(new Predicate<TestCluster.NodeAndClient>() { NodeAndClient nodeAndClient = getRandomNodeAndClient(new Predicate<TestCluster.NodeAndClient>() {
@ -594,8 +605,8 @@ public class TestCluster implements Closeable, Iterable<Client> {
nodeAndClient.close(); nodeAndClient.close();
} }
} }
public synchronized void stopCurrentMasterNode() { public synchronized void stopCurrentMasterNode() {
ensureOpen(); ensureOpen();
assert numNodes() > 0; assert numNodes() > 0;
@ -605,7 +616,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
NodeAndClient remove = nodes.remove(masterNodeName); NodeAndClient remove = nodes.remove(masterNodeName);
remove.close(); remove.close();
} }
public void stopRandomNonMasterNode() { public void stopRandomNonMasterNode() {
NodeAndClient nodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName()))); NodeAndClient nodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName())));
if (nodeAndClient != null) { if (nodeAndClient != null) {
@ -638,7 +649,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
return state.nodes().masterNode().name(); return state.nodes().masterNode().name();
} catch (Throwable e) { } catch (Throwable e) {
logger.warn("Can't fetch cluster state" , e); logger.warn("Can't fetch cluster state", e);
throw new RuntimeException("Can't get master node " + e.getMessage(), e); throw new RuntimeException("Can't get master node " + e.getMessage(), e);
} }
} }
@ -689,11 +700,11 @@ public class TestCluster implements Closeable, Iterable<Client> {
} }
})); }));
} }
public String startNode() { public String startNode() {
return startNode(ImmutableSettings.EMPTY); return startNode(ImmutableSettings.EMPTY);
} }
public String startNode(Settings.Builder settings) { public String startNode(Settings.Builder settings) {
return startNode(settings.build()); return startNode(settings.build());
} }
@ -704,7 +715,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
publishNode(buildNode); publishNode(buildNode);
return buildNode.name; return buildNode.name;
} }
private void publishNode(NodeAndClient nodeAndClient) { private void publishNode(NodeAndClient nodeAndClient) {
assert !nodeAndClient.node().isClosed(); assert !nodeAndClient.node().isClosed();
NodeEnvironment nodeEnv = getInstanceFromNode(NodeEnvironment.class, nodeAndClient.node); NodeEnvironment nodeEnv = getInstanceFromNode(NodeEnvironment.class, nodeAndClient.node);
@ -714,11 +725,11 @@ public class TestCluster implements Closeable, Iterable<Client> {
nodes.put(nodeAndClient.name, nodeAndClient); nodes.put(nodeAndClient.name, nodeAndClient);
} }
public void resetAllGateways() throws Exception { public void resetAllGateways() throws Exception {
Collection<NodeAndClient> values = this.nodes.values(); Collection<NodeAndClient> values = this.nodes.values();
for (NodeAndClient nodeAndClient : values) { for (NodeAndClient nodeAndClient : values) {
getInstanceFromNode(Gateway.class, ((InternalNode) nodeAndClient.node)).reset(); getInstanceFromNode(Gateway.class, ((InternalNode) nodeAndClient.node)).reset();
} }
} }
@ -726,7 +737,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
reset(random, wipeData, transportClientRatio); reset(random, wipeData, transportClientRatio);
} }
private static final class MasterNodePredicate implements Predicate<NodeAndClient> { private static final class MasterNodePredicate implements Predicate<NodeAndClient> {
private final String masterNodeName; private final String masterNodeName;
@ -739,7 +750,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
return masterNodeName.equals(nodeAndClient.name); return masterNodeName.equals(nodeAndClient.name);
} }
} }
private static final class ClientNodePredicate implements Predicate<NodeAndClient> { private static final class ClientNodePredicate implements Predicate<NodeAndClient> {
@Override @Override
@ -771,5 +782,5 @@ public class TestCluster implements Closeable, Iterable<Client> {
}; };
} }
} }