[Tests] Added ServiceDisruptionScheme(s) and testAckedIndexing

This commit adds the notion of ServiceDisruptionScheme allowing for introducing disruptions in our test cluster. This
abstraction as used in a couple of wrappers around the functionality offered by MockTransportService to simulate various
network partions. There is also one implementation for causing a node to be slow in processing cluster state updates.

This new mechnaism is integrated into existing tests DiscoveryWithNetworkFailuresTests.

A new test called testAckedIndexing is added to verify retrieval of documents whose indexing was acked during various disruptions.

Closes #6505
This commit is contained in:
Boaz Leskes 2014-05-16 22:09:39 +02:00
parent 5d13571dbe
commit 28489cee45
19 changed files with 1149 additions and 161 deletions

View File

@ -342,7 +342,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
logger.error("unexpected failure during [{}]", t, source);
}
@Override
@ -408,8 +408,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing {} leave request as we are no longer master", node);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}
@ -448,8 +447,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing [{}] as we are no longer master", source);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}
@ -486,8 +484,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing [{}] as we are no longer master", source);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}

View File

@ -245,6 +245,10 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
}
protected TransportRequestHandler getHandler(String action) {
return serverHandlers.get(action);
}
class Adapter implements TransportServiceAdapter {
final MeanMetric rxMetric = new MeanMetric();

View File

@ -263,12 +263,12 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
.put("discovery.type", "local")
.build();
ListenableFuture<String> master = cluster().startNodeAsync(settings);
ListenableFuture<String> nonMaster = cluster().startNodeAsync(settingsBuilder().put(settings).put("node.master", false).build());
ListenableFuture<String> master = internalCluster().startNodeAsync(settings);
ListenableFuture<String> nonMaster = internalCluster().startNodeAsync(settingsBuilder().put(settings).put("node.master", false).build());
master.get();
ensureGreen(); // make sure we have a cluster
ClusterService clusterService = cluster().getInstance(ClusterService.class, nonMaster.get());
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nonMaster.get());
final boolean[] taskFailed = {false};
final CountDownLatch latch1 = new CountDownLatch(1);

View File

@ -21,9 +21,9 @@ package org.elasticsearch.cluster;
import org.elasticsearch.action.ActionRequestBuilder;
import com.google.common.base.Predicate;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.percolate.PercolateSourceBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -48,11 +48,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertExis
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.*;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertExists;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
/**
*/

View File

@ -20,6 +20,8 @@
package org.elasticsearch.discovery;
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
@ -41,16 +43,20 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.disruption.*;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
@ -108,38 +114,36 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
assert unluckyNode != null;
// Simulate a network issue between the unlucky node and elected master node in both directions.
addFailToSendNoConnectRule(masterDiscoNode.getName(), unluckyNode);
addFailToSendNoConnectRule(unluckyNode, masterDiscoNode.getName());
try {
// Wait until elected master has removed that the unlucky node...
boolean applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return masterClient.admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2;
}
}, 1, TimeUnit.MINUTES);
assertThat(applied, is(true));
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
// continuously ping until network failures have been resolved. However
final Client isolatedNodeClient = internalCluster().client(unluckyNode);
// It may a take a bit before the node detects it has been cut off from the elected master
applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
ClusterState localClusterState = isolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState();
DiscoveryNodes localDiscoveryNodes = localClusterState.nodes();
logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint());
return localDiscoveryNodes.masterNode() == null;
}
}, 10, TimeUnit.SECONDS);
assertThat(applied, is(true));
} finally {
// stop simulating network failures, from this point on the unlucky node is able to rejoin
// We also need to do this even if assertions fail, since otherwise the test framework can't work properly
clearNoConnectRule(masterDiscoNode.getName(), unluckyNode);
clearNoConnectRule(unluckyNode, masterDiscoNode.getName());
}
NetworkDisconnectPartition networkDisconnect = new NetworkDisconnectPartition(masterDiscoNode.name(), unluckyNode, getRandom());
setDisruptionScheme(networkDisconnect);
networkDisconnect.startDisrupting();
// Wait until elected master has removed that the unlucky node...
boolean applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return masterClient.admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2;
}
}, 1, TimeUnit.MINUTES);
assertThat(applied, is(true));
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
// continuously ping until network failures have been resolved. However
final Client isolatedNodeClient = internalCluster().client(unluckyNode);
// It may a take a bit before the node detects it has been cut off from the elected master
applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
ClusterState localClusterState = isolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState();
DiscoveryNodes localDiscoveryNodes = localClusterState.nodes();
logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint());
return localDiscoveryNodes.masterNode() == null;
}
}, 10, TimeUnit.SECONDS);
assertThat(applied, is(true));
networkDisconnect.stopDisrupting();
// Wait until the master node sees all 3 nodes again.
ensureStableCluster(3);
@ -193,80 +197,78 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
// (waiting for green here, because indexing / search in a yellow index is fine as long as no other nodes go down)
ensureGreen("test");
// Pick a node that isn't the elected master.
final String isolatedNode = nodes.get(0);
final String nonIsolatedNode = nodes.get(1);
NetworkPartition networkPartition = addRandomPartition();
final String isolatedNode = networkPartition.getMinoritySide().get(0);
final String nonIsolatedNode = networkPartition.getMjaoritySide().get(0);
// Simulate a network issue between the unlucky node and the rest of the cluster.
randomIsolateNode(isolatedNode, nodes);
try {
logger.info("wait until elected master has removed [{}]", isolatedNode);
boolean applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return client(nonIsolatedNode).admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2;
}
}, 1, TimeUnit.MINUTES);
assertThat(applied, is(true));
networkPartition.startDisrupting();
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
// continuously ping until network failures have been resolved. However
// It may a take a bit before the node detects it has been cut off from the elected master
logger.info("waiting for isolated node [{}] to have no master", isolatedNode);
applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
ClusterState localClusterState = client(isolatedNode).admin().cluster().prepareState().setLocal(true).get().getState();
DiscoveryNodes localDiscoveryNodes = localClusterState.nodes();
logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint());
return localDiscoveryNodes.masterNode() == null;
}
}, 10, TimeUnit.SECONDS);
assertThat(applied, is(true));
ensureStableCluster(2, nonIsolatedNode);
// Reads on the right side of the split must work
logger.info("verifying healthy part of cluster returns data");
searchResponse = client(nonIsolatedNode).prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC)
.get();
assertHitCount(searchResponse, indexRequests.length);
for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
SearchHit searchHit = searchResponse.getHits().getAt(i);
assertThat(searchHit.id(), equalTo(String.valueOf(i)));
assertThat((long) searchHit.sortValues()[0], equalTo((long) i));
logger.info("wait until elected master has removed [{}]", isolatedNode);
boolean applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return client(nonIsolatedNode).admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2;
}
}, 1, TimeUnit.MINUTES);
assertThat(applied, is(true));
// Reads on the wrong side of the split are partial
logger.info("verifying isolated node [{}] returns partial data", isolatedNode);
searchResponse = client(isolatedNode).prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC).setPreference("_only_local")
.get();
assertThat(searchResponse.getSuccessfulShards(), lessThan(searchResponse.getTotalShards()));
assertThat(searchResponse.getHits().totalHits(), lessThan((long) indexRequests.length));
logger.info("verifying writes on healthy cluster");
UpdateResponse updateResponse = client(nonIsolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2).get();
assertThat(updateResponse.getVersion(), equalTo(2l));
try {
logger.info("verifying writes on isolated [{}] fail", isolatedNode);
client(isolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2)
.setTimeout("1s") // Fail quick, otherwise we wait 60 seconds.
.get();
fail();
} catch (ClusterBlockException exception) {
assertThat(exception.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(exception.blocks().size(), equalTo(1));
ClusterBlock clusterBlock = exception.blocks().iterator().next();
assertThat(clusterBlock.id(), equalTo(DiscoverySettings.NO_MASTER_BLOCK_ID));
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
// continuously ping until network failures have been resolved. However
// It may a take a bit before the node detects it has been cut off from the elected master
logger.info("waiting for isolated node [{}] to have no master", isolatedNode);
applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
ClusterState localClusterState = client(isolatedNode).admin().cluster().prepareState().setLocal(true).get().getState();
DiscoveryNodes localDiscoveryNodes = localClusterState.nodes();
logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint());
return localDiscoveryNodes.masterNode() == null;
}
} finally {
// stop simulating network failures, from this point on the unlucky node is able to rejoin
// We also need to do this even if assertions fail, since otherwise the test framework can't work properly
restoreIsolation(isolatedNode, nodes);
}, 10, TimeUnit.SECONDS);
assertThat(applied, is(true));
ensureStableCluster(2, nonIsolatedNode);
// Reads on the right side of the split must work
logger.info("verifying healthy part of cluster returns data");
searchResponse = client(nonIsolatedNode).prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC)
.get();
assertHitCount(searchResponse, indexRequests.length);
for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
SearchHit searchHit = searchResponse.getHits().getAt(i);
assertThat(searchHit.id(), equalTo(String.valueOf(i)));
assertThat((long) searchHit.sortValues()[0], equalTo((long) i));
}
// Reads on the wrong side of the split are partial
logger.info("verifying isolated node [{}] returns partial data", isolatedNode);
searchResponse = client(isolatedNode).prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC).setPreference("_only_local")
.get();
assertThat(searchResponse.getSuccessfulShards(), lessThan(searchResponse.getTotalShards()));
assertThat(searchResponse.getHits().totalHits(), lessThan((long) indexRequests.length));
logger.info("verifying writes on healthy cluster");
UpdateResponse updateResponse = client(nonIsolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2).get();
assertThat(updateResponse.getVersion(), equalTo(2l));
try {
logger.info("verifying writes on isolated [{}] fail", isolatedNode);
client(isolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2)
.setTimeout("1s") // Fail quick, otherwise we wait 60 seconds.
.get();
fail();
} catch (ClusterBlockException exception) {
assertThat(exception.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(exception.blocks().size(), equalTo(1));
ClusterBlock clusterBlock = exception.blocks().iterator().next();
assertThat(clusterBlock.id(), equalTo(DiscoverySettings.NO_MASTER_BLOCK_ID));
}
networkPartition.stopDisrupting();
// Wait until the master node sees all 3 nodes again.
ensureStableCluster(3);
@ -316,13 +318,14 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
break;
}
}
randomIsolateNode(isolatedNode, nodes);
ServiceDisruptionScheme scheme = addRandomIsolation(isolatedNode);
scheme.startDisrupting();
// make sure cluster reforms
ensureStableCluster(2, nonIsolatedNode);
// restore isolation
restoreIsolation(isolatedNode, nodes);
scheme.stopDisrupting();
ensureStableCluster(3);
@ -356,7 +359,120 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
}
}
@Test
@LuceneTestCase.AwaitsFix(bugUrl = "MvG will fix")
public void testAckedIndexing() throws Exception {
final List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get();
ensureStableCluster(3);
assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
));
ensureGreen();
ServiceDisruptionScheme disruptionScheme = addRandomDisruptionScheme();
logger.info("disruption scheme [{}] added", disruptionScheme);
final ConcurrentHashMap<String, String> ackedDocs = new ConcurrentHashMap<>(); // id -> node sent.
final AtomicBoolean stop = new AtomicBoolean(false);
List<Thread> indexers = new ArrayList<>(nodes.size());
List<Semaphore> semaphores = new ArrayList<>(nodes.size());
final AtomicInteger idGenerator = new AtomicInteger(0);
final AtomicReference<CountDownLatch> countDownLatch = new AtomicReference<>();
logger.info("starting indexers");
for (final String node : nodes) {
final Semaphore semaphore = new Semaphore(0);
semaphores.add(semaphore);
final Client client = client(node);
final String name = "indexer_" + indexers.size();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (!stop.get()) {
try {
if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
continue;
}
try {
String id = Integer.toString(idGenerator.incrementAndGet());
logger.trace("[{}] indexing id [{}] through node [{}]", name, id, node);
IndexResponse response = client.prepareIndex("test", "type", id).setSource("{}").setTimeout("1s").get();
ackedDocs.put(id, node);
} finally {
countDownLatch.get().countDown();
logger.trace("[{}] decreased counter : {}", name, countDownLatch.get().getCount());
}
} catch (ElasticsearchException | InterruptedException e) {
// expected
} catch (Throwable t) {
logger.info("unexpected exception in background thread of [{}]", t, node);
}
}
}
});
thread.setName(name);
thread.setDaemon(true);
thread.start();
indexers.add(thread);
}
logger.info("indexing some docs before partition");
int docsPerIndexer = randomInt(3);
countDownLatch.set(new CountDownLatch(docsPerIndexer * indexers.size()));
for (Semaphore semaphore : semaphores) {
semaphore.release(docsPerIndexer);
}
assertTrue(countDownLatch.get().await(1, TimeUnit.MINUTES));
for (int iter = 1 + randomInt(2); iter > 0; iter--) {
logger.info("starting disruptions & indexing (iteration [{}])", iter);
disruptionScheme.startDisrupting();
docsPerIndexer = 1 + randomInt(5);
countDownLatch.set(new CountDownLatch(docsPerIndexer * indexers.size()));
Collections.shuffle(semaphores);
for (Semaphore semaphore : semaphores) {
semaphore.release(docsPerIndexer);
}
assertTrue(countDownLatch.get().await(1, TimeUnit.MINUTES));
logger.info("stopping disruption");
disruptionScheme.stopDisrupting();
ensureStableCluster(3);
ensureGreen("test");
logger.info("validating successful docs");
for (String node : nodes) {
try {
logger.debug("validating through node [{}]", node);
for (String id : ackedDocs.keySet()) {
assertTrue("doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found",
client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
}
} catch (AssertionError e) {
throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e);
}
}
logger.info("done validating (iteration [{}])", iter);
}
logger.info("shutting down indexers");
stop.set(true);
for (Thread indexer : indexers) {
indexer.interrupt();
indexer.join(60000);
}
}
@ -379,7 +495,8 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
String isolatedNode = nodes.get(0);
String notIsolatedNode = nodes.get(1);
randomIsolateNode(isolatedNode, nodes);
ServiceDisruptionScheme scheme = addRandomIsolation(isolatedNode);
scheme.startDisrupting();
ensureStableCluster(2, notIsolatedNode);
assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut());
@ -395,7 +512,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
assertThat(getResponse.getVersion(), equalTo(1l));
assertThat(getResponse.getId(), equalTo(indexResponse.getId()));
restoreIsolation(isolatedNode, nodes);
scheme.stopDisrupting();
ensureStableCluster(3);
ensureGreen("test");
@ -411,30 +528,47 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
}
protected void restoreIsolation(String isolatedNode, List<String> nodes) {
logger.info("restoring isolation of [{}]", isolatedNode);
for (String nodeId : nodes) {
if (!nodeId.equals(isolatedNode)) {
clearNoConnectRule(nodeId, isolatedNode);
clearNoConnectRule(isolatedNode, nodeId);
}
protected NetworkPartition addRandomPartition() {
NetworkPartition partition;
if (randomBoolean()) {
partition = new NetworkUnresponsivePartition(getRandom());
} else {
partition = new NetworkDisconnectPartition(getRandom());
}
setDisruptionScheme(partition);
return partition;
}
protected void randomIsolateNode(String isolatedNode, List<String> nodes) {
boolean unresponsive = randomBoolean();
logger.info("isolating [{}] with unresponsive: [{}]", isolatedNode, unresponsive);
for (String nodeId : nodes) {
if (!nodeId.equals(isolatedNode)) {
if (unresponsive) {
addUnresponsiveRule(nodeId, isolatedNode);
addUnresponsiveRule(isolatedNode, nodeId);
} else {
addFailToSendNoConnectRule(nodeId, isolatedNode);
addFailToSendNoConnectRule(isolatedNode, nodeId);
}
}
protected NetworkPartition addRandomIsolation(String isolatedNode) {
Set<String> side1 = new HashSet<>();
Set<String> side2 = new HashSet<>(Arrays.asList(internalCluster().getNodeNames()));
side1.add(isolatedNode);
side2.remove(isolatedNode);
NetworkPartition partition;
if (randomBoolean()) {
partition = new NetworkUnresponsivePartition(side1, side2, getRandom());
} else {
partition = new NetworkDisconnectPartition(side1, side2, getRandom());
}
internalCluster().setDisruptionScheme(partition);
return partition;
}
private ServiceDisruptionScheme addRandomDisruptionScheme() {
List<ServiceDisruptionScheme> list = Arrays.asList(
new NetworkUnresponsivePartition(getRandom()),
new NetworkDelaysPartition(getRandom()),
new NetworkDisconnectPartition(getRandom()),
new SlowClusterStateProcessing(getRandom())
);
Collections.shuffle(list);
setDisruptionScheme(list.get(0));
return list.get(0);
}
private DiscoveryNode findMasterNode(List<String> nodes) {
@ -452,21 +586,6 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
return masterDiscoNode;
}
private void addFailToSendNoConnectRule(String fromNode, String toNode) {
TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode);
((MockTransportService) mockTransportService).addFailToSendNoConnectRule(internalCluster().getInstance(Discovery.class, toNode).localNode());
}
private void addUnresponsiveRule(String fromNode, String toNode) {
TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode);
((MockTransportService) mockTransportService).addUnresponsiveRule(internalCluster().getInstance(Discovery.class, toNode).localNode());
}
private void clearNoConnectRule(String fromNode, String toNode) {
TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode);
((MockTransportService) mockTransportService).clearRule(internalCluster().getInstance(Discovery.class, toNode).localNode());
}
private void ensureStableCluster(int nodeCount) {
ensureStableCluster(nodeCount, null);

View File

@ -43,7 +43,6 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.hamcrest.Matchers.equalTo;
public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {

View File

@ -217,7 +217,7 @@ public class BackgroundIndexer implements AutoCloseable {
setBudget(numOfDocs);
}
/** Stop all background threads **/
/** Stop all background threads * */
public void stop() throws InterruptedException {
if (stop.get()) {
return;

View File

@ -97,6 +97,7 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.client.RandomizingClient;
import org.hamcrest.Matchers;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.junit.*;
import java.io.IOException;
@ -583,6 +584,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
boolean success = false;
try {
logger.info("[{}#{}]: cleaning up after test", getTestClass().getSimpleName(), getTestName());
clearDisruptionScheme();
final Scope currentClusterScope = getCurrentClusterScope();
try {
if (currentClusterScope != Scope.TEST) {
@ -696,6 +698,15 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return between(minimumNumberOfReplicas(), maximumNumberOfReplicas());
}
public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
internalCluster().setDisruptionScheme(scheme);
}
public void clearDisruptionScheme() {
internalCluster().clearDisruptionScheme();
}
/**
* Returns a settings object used in {@link #createIndex(String...)} and {@link #prepareCreate(String)} and friends.
* This method can be overwritten by subclasses to set defaults for the indices that are created by the test.

View File

@ -76,6 +76,7 @@ import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.cache.recycler.MockBigArraysModule;
import org.elasticsearch.test.cache.recycler.MockPageCacheRecyclerModule;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.engine.MockEngineModule;
import org.elasticsearch.test.store.MockFSIndexStoreModule;
import org.elasticsearch.test.transport.AssertingLocalTransport;
@ -185,6 +186,8 @@ public final class InternalTestCluster extends TestCluster {
private final boolean hasFilterCache;
private ServiceDisruptionScheme activeDisruptionScheme;
public InternalTestCluster(long clusterSeed, String clusterName) {
this(clusterSeed, DEFAULT_MIN_NUM_DATA_NODES, DEFAULT_MAX_NUM_DATA_NODES, clusterName, SettingsSource.EMPTY, DEFAULT_NUM_CLIENT_NODES, DEFAULT_ENABLE_RANDOM_BENCH_NODES);
}
@ -288,6 +291,10 @@ public final class InternalTestCluster extends TestCluster {
return clusterName;
}
public String[] getNodeNames() {
return nodes.keySet().toArray(Strings.EMPTY_ARRAY);
}
private static boolean isLocalTransportConfigured() {
if ("local".equals(System.getProperty("es.node.mode", "network"))) {
return true;
@ -486,6 +493,7 @@ public final class InternalTestCluster extends TestCluster {
while (limit.hasNext()) {
NodeAndClient next = limit.next();
nodesToRemove.add(next);
removeDistruptionSchemeFromNode(next);
next.close();
}
for (NodeAndClient toRemove : nodesToRemove) {
@ -660,6 +668,10 @@ public final class InternalTestCluster extends TestCluster {
@Override
public void close() {
if (this.open.compareAndSet(true, false)) {
if (activeDisruptionScheme != null) {
activeDisruptionScheme.testClusterClosed();
activeDisruptionScheme = null;
}
IOUtils.closeWhileHandlingException(nodes.values());
nodes.clear();
executor.shutdownNow();
@ -824,6 +836,7 @@ public final class InternalTestCluster extends TestCluster {
}
private synchronized void reset(boolean wipeData) throws IOException {
clearDisruptionScheme();
randomlyResetClients();
if (wipeData) {
wipeDataDirectories();
@ -1023,6 +1036,7 @@ public final class InternalTestCluster extends TestCluster {
NodeAndClient nodeAndClient = getRandomNodeAndClient(new DataNodePredicate());
if (nodeAndClient != null) {
logger.info("Closing random node [{}] ", nodeAndClient.name);
removeDistruptionSchemeFromNode(nodeAndClient);
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
@ -1042,6 +1056,7 @@ public final class InternalTestCluster extends TestCluster {
});
if (nodeAndClient != null) {
logger.info("Closing filtered random node [{}] ", nodeAndClient.name);
removeDistruptionSchemeFromNode(nodeAndClient);
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
@ -1056,6 +1071,7 @@ public final class InternalTestCluster extends TestCluster {
String masterNodeName = getMasterName();
assert nodes.containsKey(masterNodeName);
logger.info("Closing master node [{}] ", masterNodeName);
removeDistruptionSchemeFromNode(nodes.get(masterNodeName));
NodeAndClient remove = nodes.remove(masterNodeName);
remove.close();
}
@ -1067,6 +1083,7 @@ public final class InternalTestCluster extends TestCluster {
NodeAndClient nodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName())));
if (nodeAndClient != null) {
logger.info("Closing random non master node [{}] current master [{}] ", nodeAndClient.name, getMasterName());
removeDistruptionSchemeFromNode(nodeAndClient);
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
@ -1120,6 +1137,9 @@ public final class InternalTestCluster extends TestCluster {
if (!callback.doRestart(nodeAndClient.name)) {
logger.info("Closing node [{}] during restart", nodeAndClient.name);
toRemove.add(nodeAndClient);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.close();
}
}
@ -1134,18 +1154,33 @@ public final class InternalTestCluster extends TestCluster {
for (NodeAndClient nodeAndClient : nodes.values()) {
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
logger.info("Restarting node [{}] ", nodeAndClient.name);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.restart(callback);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
}
} else {
int numNodesRestarted = 0;
for (NodeAndClient nodeAndClient : nodes.values()) {
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
logger.info("Stopping node [{}] ", nodeAndClient.name);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.node.close();
}
for (NodeAndClient nodeAndClient : nodes.values()) {
logger.info("Starting node [{}] ", nodeAndClient.name);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.restart(callback);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
}
}
}
@ -1343,6 +1378,7 @@ public final class InternalTestCluster extends TestCluster {
dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataLocations()));
}
nodes.put(nodeAndClient.name, nodeAndClient);
applyDisruptionSchemeToNode(nodeAndClient);
}
public void closeNonSharedNodes(boolean wipeData) throws IOException {
@ -1364,6 +1400,33 @@ public final class InternalTestCluster extends TestCluster {
return hasFilterCache;
}
public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
clearDisruptionScheme();
scheme.applyToCluster(this);
activeDisruptionScheme = scheme;
}
public void clearDisruptionScheme() {
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromCluster(this);
}
activeDisruptionScheme = null;
}
private void applyDisruptionSchemeToNode(NodeAndClient nodeAndClient) {
if (activeDisruptionScheme != null) {
assert nodes.containsKey(nodeAndClient.name);
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
}
private void removeDistruptionSchemeFromNode(NodeAndClient nodeAndClient) {
if (activeDisruptionScheme != null) {
assert nodes.containsKey(nodeAndClient.name);
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
}
private synchronized Collection<NodeAndClient> dataNodeAndClients() {
return Collections2.filter(nodes.values(), new DataNodePredicate());
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.indices.IndexMissingException;

View File

@ -0,0 +1,88 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.transport.MockTransportService;
import java.util.Random;
import java.util.Set;
public class NetworkDelaysPartition extends NetworkPartition {
static long DEFAULT_DELAY_MIN = 10000;
static long DEFAULT_DELAY_MAX = 90000;
final long delayMin;
final long delayMax;
TimeValue duration;
public NetworkDelaysPartition(Random random) {
this(random, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX);
}
public NetworkDelaysPartition(Random random, long delayMin, long delayMax) {
super(random);
this.delayMin = delayMin;
this.delayMax = delayMax;
}
public NetworkDelaysPartition(String node1, String node2, Random random) {
this(node1, node2, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX, random);
}
public NetworkDelaysPartition(String node1, String node2, long delayMin, long delayMax, Random random) {
super(node1, node2, random);
this.delayMin = delayMin;
this.delayMax = delayMax;
}
public NetworkDelaysPartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, Random random) {
this(nodesSideOne, nodesSideTwo, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX, random);
}
public NetworkDelaysPartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, long delayMin, long delayMax, Random random) {
super(nodesSideOne, nodesSideTwo, random);
this.delayMin = delayMin;
this.delayMax = delayMax;
}
@Override
public synchronized void startDisrupting() {
duration = new TimeValue(delayMin + random.nextInt((int) (delayMax - delayMin)));
super.startDisrupting();
}
@Override
void applyDisruption(DiscoveryNode node1, MockTransportService transportService1,
DiscoveryNode node2, MockTransportService transportService2) {
transportService1.addUnresponsiveRule(node1, duration);
transportService1.addUnresponsiveRule(node2, duration);
}
@Override
protected String getPartitionDescription() {
return "network delays for [" + duration + "]";
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.test.transport.MockTransportService;
import java.util.Random;
import java.util.Set;
public class NetworkDisconnectPartition extends NetworkPartition {
public NetworkDisconnectPartition(Random random) {
super(random);
}
public NetworkDisconnectPartition(String node1, String node2, Random random) {
super(node1, node2, random);
}
public NetworkDisconnectPartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, Random random) {
super(nodesSideOne, nodesSideTwo, random);
}
@Override
protected String getPartitionDescription() {
return "disconnected";
}
@Override
void applyDisruption(DiscoveryNode node1, MockTransportService transportService1,
DiscoveryNode node2, MockTransportService transportService2) {
transportService1.addFailToSendNoConnectRule(node2);
transportService2.addFailToSendNoConnectRule(node1);
}
}

View File

@ -0,0 +1,199 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.disruption;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.TestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
public abstract class NetworkPartition implements ServiceDisruptionScheme {
protected final ESLogger logger = Loggers.getLogger(getClass());
final Set<String> nodesSideOne;
final Set<String> nodesSideTwo;
volatile boolean autoExpand;
protected final Random random;
protected volatile InternalTestCluster cluster;
public NetworkPartition(Random random) {
this.random = new Random(random.nextLong());
nodesSideOne = new HashSet<>();
nodesSideTwo = new HashSet<>();
autoExpand = true;
}
public NetworkPartition(String node1, String node2, Random random) {
this(random);
nodesSideOne.add(node1);
nodesSideTwo.add(node2);
autoExpand = false;
}
public NetworkPartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, Random random) {
this(random);
this.nodesSideOne.addAll(nodesSideOne);
this.nodesSideTwo.addAll(nodesSideTwo);
autoExpand = false;
}
public List<String> getNodesSideOne() {
return ImmutableList.copyOf(nodesSideOne);
}
public List<String> getNodesSideTwo() {
return ImmutableList.copyOf(nodesSideTwo);
}
public List<String> getMjaoritySide() {
if (nodesSideOne.size() >= nodesSideTwo.size()) {
return getNodesSideOne();
} else {
return getNodesSideTwo();
}
}
public List<String> getMinoritySide() {
if (nodesSideOne.size() >= nodesSideTwo.size()) {
return getNodesSideTwo();
} else {
return getNodesSideOne();
}
}
@Override
public void applyToCluster(InternalTestCluster cluster) {
this.cluster = cluster;
if (autoExpand) {
for (String node : cluster.getNodeNames()) {
applyToNode(node, cluster);
}
}
}
@Override
public void removeFromCluster(InternalTestCluster cluster) {
stopDisrupting();
}
@Override
public synchronized void applyToNode(String node, InternalTestCluster cluster) {
if (!autoExpand || nodesSideOne.contains(node) || nodesSideTwo.contains(node)) {
return;
}
if (nodesSideOne.isEmpty()) {
nodesSideOne.add(node);
} else if (nodesSideTwo.isEmpty()) {
nodesSideTwo.add(node);
} else if (random.nextBoolean()) {
nodesSideOne.add(node);
} else {
nodesSideTwo.add(node);
}
}
@Override
public synchronized void removeFromNode(String node, InternalTestCluster cluster) {
MockTransportService transportService = (MockTransportService) cluster.getInstance(TransportService.class, node);
DiscoveryNode discoveryNode = discoveryNode(node);
Set<String> otherSideNodes;
if (nodesSideOne.contains(node)) {
otherSideNodes = nodesSideTwo;
} else if (nodesSideTwo.contains(node)) {
otherSideNodes = nodesSideOne;
} else {
return;
}
for (String node2 : otherSideNodes) {
MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
DiscoveryNode discoveryNode2 = discoveryNode(node2);
removeDisruption(discoveryNode, transportService, discoveryNode2, transportService2);
}
}
@Override
public synchronized void testClusterClosed() {
}
protected abstract String getPartitionDescription();
protected DiscoveryNode discoveryNode(String node) {
return cluster.getInstance(Discovery.class, node).localNode();
}
@Override
public synchronized void startDisrupting() {
if (nodesSideOne.size() == 0 || nodesSideTwo.size() == 0) {
return;
}
logger.info("nodes {} will be partitioned from {}. partition type [{}]", nodesSideOne, nodesSideTwo, getPartitionDescription());
for (String node1 : nodesSideOne) {
MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1);
DiscoveryNode discoveryNode1 = discoveryNode(node1);
for (String node2 : nodesSideTwo) {
DiscoveryNode discoveryNode2 = discoveryNode(node2);
MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
applyDisruption(discoveryNode1, transportService1, discoveryNode2, transportService2);
}
}
}
@Override
public void stopDisrupting() {
if (nodesSideOne.size() == 0 || nodesSideTwo.size() == 0) {
return;
}
logger.info("restoring partition between nodes {} & nodes {}", nodesSideOne, nodesSideTwo);
for (String node1 : nodesSideOne) {
MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1);
DiscoveryNode discoveryNode1 = discoveryNode(node1);
for (String node2 : nodesSideTwo) {
DiscoveryNode discoveryNode2 = discoveryNode(node2);
MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
removeDisruption(discoveryNode1, transportService1, discoveryNode2, transportService2);
}
}
}
abstract void applyDisruption(DiscoveryNode node1, MockTransportService transportService1,
DiscoveryNode node2, MockTransportService transportService2);
protected void removeDisruption(DiscoveryNode node1, MockTransportService transportService1,
DiscoveryNode node2, MockTransportService transportService2) {
transportService1.clearRule(node2);
transportService2.clearRule(node1);
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.test.transport.MockTransportService;
import java.util.Random;
import java.util.Set;
public class NetworkUnresponsivePartition extends NetworkPartition {
public NetworkUnresponsivePartition(Random random) {
super(random);
}
public NetworkUnresponsivePartition(String node1, String node2, Random random) {
super(node1, node2, random);
}
public NetworkUnresponsivePartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, Random random) {
super(nodesSideOne, nodesSideTwo, random);
}
@Override
protected String getPartitionDescription() {
return "unresponsive";
}
@Override
void applyDisruption(DiscoveryNode node1, MockTransportService transportService1,
DiscoveryNode node2, MockTransportService transportService2) {
transportService1.addUnresponsiveRule(node2);
transportService2.addUnresponsiveRule(node1);
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.disruption;
import org.elasticsearch.test.InternalTestCluster;
public class NoOpDisruptionScheme implements ServiceDisruptionScheme {
@Override
public void applyToCluster(InternalTestCluster cluster) {
}
@Override
public void removeFromCluster(InternalTestCluster cluster) {
}
@Override
public void applyToNode(String node, InternalTestCluster cluster) {
}
@Override
public void removeFromNode(String node, InternalTestCluster cluster) {
}
@Override
public void startDisrupting() {
}
@Override
public void stopDisrupting() {
}
@Override
public void testClusterClosed() {
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.disruption;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.TestCluster;
public interface ServiceDisruptionScheme {
public void applyToCluster(InternalTestCluster cluster);
public void removeFromCluster(InternalTestCluster cluster);
public void applyToNode(String node, InternalTestCluster cluster);
public void removeFromNode(String node, InternalTestCluster cluster);
public void startDisrupting();
public void stopDisrupting();
public void testClusterClosed();
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.disruption;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.InternalTestCluster;
import java.util.Random;
public abstract class SingleNodeDisruption implements ServiceDisruptionScheme {
protected final ESLogger logger = Loggers.getLogger(getClass());
protected volatile String disruptedNode;
protected volatile InternalTestCluster cluster;
protected final Random random;
public SingleNodeDisruption(String disruptedNode, Random random) {
this(random);
this.disruptedNode = disruptedNode;
}
public SingleNodeDisruption(Random random) {
this.random = new Random(random.nextLong());
}
@Override
public void applyToCluster(InternalTestCluster cluster) {
this.cluster = cluster;
if (disruptedNode == null) {
String[] nodes = cluster.getNodeNames();
disruptedNode = nodes[random.nextInt(nodes.length)];
}
}
@Override
public void removeFromCluster(InternalTestCluster cluster) {
if (disruptedNode != null) {
removeFromNode(disruptedNode, cluster);
}
}
@Override
public synchronized void applyToNode(String node, InternalTestCluster cluster) {
}
@Override
public synchronized void removeFromNode(String node, InternalTestCluster cluster) {
if (disruptedNode == null) {
return;
}
if (!node.equals(disruptedNode)) {
return;
}
stopDisrupting();
disruptedNode = null;
}
@Override
public synchronized void testClusterClosed() {
disruptedNode = null;
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import java.util.Random;
public class SlowClusterStateProcessing extends SingleNodeDisruption {
volatile boolean disrupting;
volatile Thread worker;
final long intervalBetweenDelaysMin;
final long intervalBetweenDelaysMax;
final long delayDurationMin;
final long delayDurationMax;
public SlowClusterStateProcessing(Random random) {
this(null, random);
}
public SlowClusterStateProcessing(String disruptedNode, Random random) {
this(disruptedNode, random, 100, 200, 300, 20000);
}
public SlowClusterStateProcessing(String disruptedNode, Random random, long intervalBetweenDelaysMin,
long intervalBetweenDelaysMax, long delayDurationMin, long delayDurationMax) {
this(random, intervalBetweenDelaysMin, intervalBetweenDelaysMax, delayDurationMin, delayDurationMax);
this.disruptedNode = disruptedNode;
}
public SlowClusterStateProcessing(Random random,
long intervalBetweenDelaysMin, long intervalBetweenDelaysMax, long delayDurationMin,
long delayDurationMax) {
super(random);
this.intervalBetweenDelaysMin = intervalBetweenDelaysMin;
this.intervalBetweenDelaysMax = intervalBetweenDelaysMax;
this.delayDurationMin = delayDurationMin;
this.delayDurationMax = delayDurationMax;
}
@Override
public void startDisrupting() {
disrupting = true;
worker = new Thread(new BackgroundWorker());
worker.setDaemon(true);
worker.start();
}
@Override
public void stopDisrupting() {
disrupting = false;
try {
worker.join(2 * (intervalBetweenDelaysMax + delayDurationMax));
} catch (InterruptedException e) {
logger.info("background thread failed to stop");
}
worker = null;
}
private synchronized boolean interruptClusterStateProcessing(final TimeValue duration) {
if (disruptedNode == null) {
return false;
}
logger.info("delaying cluster state updates on node [{}] for [{}]", disruptedNode, duration);
ClusterService clusterService = cluster.getInstance(ClusterService.class, disruptedNode);
clusterService.submitStateUpdateTask("service_disruption_delay", Priority.IMMEDIATE, new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(duration.millis());
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
}
});
return true;
}
class BackgroundWorker implements Runnable {
@Override
public void run() {
while (disrupting) {
try {
TimeValue duration = new TimeValue(delayDurationMin + random.nextInt((int) (delayDurationMax - delayDurationMin)));
if (!interruptClusterStateProcessing(duration)) {
continue;
}
Thread.sleep(duration.millis());
if (disruptedNode == null) {
return;
}
} catch (Exception e) {
logger.error("error in background worker", e);
}
}
}
}
}

View File

@ -24,9 +24,14 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -46,6 +51,7 @@ public class MockTransportService extends TransportService {
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings, new LookupTestTransport(transport), threadPool);
this.original = transport;
}
/**
@ -97,7 +103,7 @@ public class MockTransportService extends TransportService {
*/
public void addFailToSendNoConnectRule(DiscoveryNode node, final Set<String> blockedActions) {
((LookupTestTransport) transport).transports.put(node.getAddress(), new DelegateTransport(original) {
addDelegate(node, new DelegateTransport(original) {
@Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
original.connectToNode(node);
@ -124,7 +130,6 @@ public class MockTransportService extends TransportService {
* and failing to connect once the rule was added.
*/
public void addUnresponsiveRule(DiscoveryNode node) {
// TODO add a parameter to delay the connect timeout?
addDelegate(node, new DelegateTransport(original) {
@Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
@ -143,8 +148,98 @@ public class MockTransportService extends TransportService {
});
}
/**
* Adds a rule that will cause ignores each send request, simulating an unresponsive node
* and failing to connect once the rule was added.
*
* @param duration the amount of time to delay sending and connecting.
*/
public void addUnresponsiveRule(DiscoveryNode node, final TimeValue duration) {
final long startTime = System.currentTimeMillis();
addDelegate(node, new DelegateTransport(original) {
TimeValue getDelay() {
return new TimeValue(duration.millis() - (System.currentTimeMillis() - startTime));
}
@Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
TimeValue delay = getDelay();
if (delay.millis() <= 0) {
original.connectToNode(node);
}
// TODO: Replace with proper setting
TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_DEFAULT_CONNECT_TIMEOUT;
try {
if (delay.millis() < connectingTimeout.millis()) {
Thread.sleep(delay.millis());
original.connectToNode(node);
} else {
Thread.sleep(connectingTimeout.millis());
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
}
} catch (InterruptedException e) {
throw new ConnectTransportException(node, "UNRESPONSIVE: interrupted while sleeping", e);
}
}
@Override
public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
TimeValue delay = getDelay();
if (delay.millis() <= 0) {
original.connectToNodeLight(node);
}
// TODO: Replace with proper setting
TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_DEFAULT_CONNECT_TIMEOUT;
try {
if (delay.millis() < connectingTimeout.millis()) {
Thread.sleep(delay.millis());
original.connectToNodeLight(node);
} else {
Thread.sleep(connectingTimeout.millis());
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
}
} catch (InterruptedException e) {
throw new ConnectTransportException(node, "UNRESPONSIVE: interrupted while sleeping", e);
}
}
@Override
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, TransportRequest request, final TransportRequestOptions options) throws IOException, TransportException {
// delayed sending - even if larger then the request timeout to simulated a potential late response from target node
TimeValue delay = getDelay();
if (delay.millis() <= 0) {
original.sendRequest(node, requestId, action, request, options);
}
// poor mans request cloning...
TransportRequestHandler handler = MockTransportService.this.getHandler(action);
BytesStreamOutput bStream = new BytesStreamOutput();
request.writeTo(bStream);
final TransportRequest clonedRequest = handler.newInstance();
clonedRequest.readFrom(new BytesStreamInput(bStream.bytes()));
threadPool.schedule(delay, ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void run() {
try {
original.sendRequest(node, requestId, action, clonedRequest, options);
} catch (Throwable e) {
logger.debug("failed to send delayed request", e);
}
}
});
}
});
}
/**
* Adds a new delegate transport that is used for communication with the given node.
*
* @return <tt>true</tt> iff no other delegate was registered for this node before, otherwise <tt>false</tt>
*/
public boolean addDelegate(DiscoveryNode node, DelegateTransport transport) {
@ -214,7 +309,6 @@ public class MockTransportService extends TransportService {
}
@Override
public void transportServiceAdapter(TransportServiceAdapter service) {
transport.transportServiceAdapter(service);