[TEST] Renamed afterDistribution timeout to expectedTimeToHeal

Accumulate expected shard failures to log later
This commit is contained in:
Martijn van Groningen 2014-06-23 14:04:54 +02:00 committed by Boaz Leskes
parent 785d0e55ab
commit f7b962a417
10 changed files with 108 additions and 97 deletions

View File

@ -20,7 +20,6 @@
package org.elasticsearch.discovery;
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.get.GetResponse;
@ -365,7 +364,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
@Test
@LuceneTestCase.AwaitsFix(bugUrl = "MvG will fix")
// @LuceneTestCase.AwaitsFix(bugUrl = "MvG will fix")
@TestLogging("action.index:TRACE,action.get:TRACE,discovery:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testAckedIndexing() throws Exception {
final List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get();
@ -374,7 +373,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1 + randomInt(1))
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
));
ensureGreen();
@ -388,99 +387,104 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
List<Semaphore> semaphores = new ArrayList<>(nodes.size());
final AtomicInteger idGenerator = new AtomicInteger(0);
final AtomicReference<CountDownLatch> countDownLatchRef = new AtomicReference<>();
logger.info("starting indexers");
final List<Exception> exceptedExceptions = Collections.synchronizedList(new ArrayList<Exception>());
for (final String node : nodes) {
final Semaphore semaphore = new Semaphore(0);
semaphores.add(semaphore);
final Client client = client(node);
final String name = "indexer_" + indexers.size();
final int numPrimaries = getNumShards("test").numPrimaries;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (!stop.get()) {
String id = null;
try {
if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
continue;
}
logger.info("[{}] Acquired semaphore and it has {} permits left", name, semaphore.availablePermits());
logger.info("starting indexers");
try {
for (final String node : nodes) {
final Semaphore semaphore = new Semaphore(0);
semaphores.add(semaphore);
final Client client = client(node);
final String name = "indexer_" + indexers.size();
final int numPrimaries = getNumShards("test").numPrimaries;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (!stop.get()) {
String id = null;
try {
id = Integer.toString(idGenerator.incrementAndGet());
int shard = ((InternalTestCluster) cluster()).getInstance(DjbHashFunction.class).hash(id) % numPrimaries;
logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
IndexResponse response = client.prepareIndex("test", "type", id).setSource("{}").setTimeout("1s").get();
assertThat(response.getVersion(), equalTo(1l));
ackedDocs.put(id, node);
logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node);
} finally {
countDownLatchRef.get().countDown();
logger.trace("[{}] decreased counter : {}", name, countDownLatchRef.get().getCount());
if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
continue;
}
logger.info("[{}] Acquired semaphore and it has {} permits left", name, semaphore.availablePermits());
try {
id = Integer.toString(idGenerator.incrementAndGet());
int shard = ((InternalTestCluster) cluster()).getInstance(DjbHashFunction.class).hash(id) % numPrimaries;
logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
IndexResponse response = client.prepareIndex("test", "type", id).setSource("{}").setTimeout("1s").get();
assertThat(response.getVersion(), equalTo(1l));
ackedDocs.put(id, node);
logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node);
} finally {
countDownLatchRef.get().countDown();
logger.trace("[{}] decreased counter : {}", name, countDownLatchRef.get().getCount());
}
} catch (ElasticsearchException | InterruptedException e) {
exceptedExceptions.add(e);
logger.trace("[{}] failed id [{}] through node [{}]", e, name, id, node);
} catch (Throwable t) {
logger.info("unexpected exception in background thread of [{}]", t, node);
}
} catch (ElasticsearchException | InterruptedException e) {
logger.trace("[{}] failed id [{}] through node [{}]", e, name, id, node);
} catch (Throwable t) {
logger.info("unexpected exception in background thread of [{}]", t, node);
}
}
}
});
});
thread.setName(name);
thread.setDaemon(true);
thread.start();
indexers.add(thread);
}
thread.setName(name);
thread.setDaemon(true);
thread.start();
indexers.add(thread);
}
int docsPerIndexer = randomInt(3);
logger.info("indexing " + docsPerIndexer + " docs per indexer before partition");
countDownLatchRef.set(new CountDownLatch(docsPerIndexer * indexers.size()));
for (Semaphore semaphore : semaphores) {
semaphore.release(docsPerIndexer);
}
assertTrue(countDownLatchRef.get().await(1, TimeUnit.MINUTES));
for (int iter = 1 + randomInt(2); iter > 0; iter--) {
logger.info("starting disruptions & indexing (iteration [{}])", iter);
disruptionScheme.startDisrupting();
docsPerIndexer = 1 + randomInt(5);
logger.info("indexing " + docsPerIndexer + " docs per indexer during partition");
int docsPerIndexer = randomInt(3);
logger.info("indexing " + docsPerIndexer + " docs per indexer before partition");
countDownLatchRef.set(new CountDownLatch(docsPerIndexer * indexers.size()));
Collections.shuffle(semaphores);
for (Semaphore semaphore : semaphores) {
assertThat(semaphore.availablePermits(), equalTo(0));
semaphore.release(docsPerIndexer);
}
assertTrue(countDownLatchRef.get().await(disruptionScheme.afterDisruptionTimeOut().millis() * (docsPerIndexer * indexers.size()), TimeUnit.MILLISECONDS));
assertTrue(countDownLatchRef.get().await(1, TimeUnit.MINUTES));
logger.info("stopping disruption");
disruptionScheme.stopDisrupting();
ensureStableCluster(3, disruptionScheme.afterDisruptionTimeOut());
ensureGreen("test");
for (int iter = 1 + randomInt(2); iter > 0; iter--) {
logger.info("starting disruptions & indexing (iteration [{}])", iter);
disruptionScheme.startDisrupting();
logger.info("validating successful docs");
for (String node : nodes) {
try {
logger.debug("validating through node [{}]", node);
for (String id : ackedDocs.keySet()) {
assertTrue("doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found",
client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
}
} catch (AssertionError e) {
throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e);
docsPerIndexer = 1 + randomInt(5);
logger.info("indexing " + docsPerIndexer + " docs per indexer during partition");
countDownLatchRef.set(new CountDownLatch(docsPerIndexer * indexers.size()));
Collections.shuffle(semaphores);
for (Semaphore semaphore : semaphores) {
assertThat(semaphore.availablePermits(), equalTo(0));
semaphore.release(docsPerIndexer);
}
assertTrue(countDownLatchRef.get().await(30000 + disruptionScheme.expectedTimeToHeal().millis() * (docsPerIndexer * indexers.size()), TimeUnit.MILLISECONDS));
logger.info("stopping disruption");
disruptionScheme.stopDisrupting();
ensureStableCluster(3, TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() + 30000));
ensureGreen("test");
logger.info("validating successful docs");
for (String node : nodes) {
try {
logger.debug("validating through node [{}]", node);
for (String id : ackedDocs.keySet()) {
assertTrue("doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found",
client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
}
} catch (AssertionError e) {
throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e);
}
}
logger.info("done validating (iteration [{}])", iter);
}
} finally {
logger.debug("Excepted exception during disruption [{}]", exceptedExceptions);
logger.info("shutting down indexers");
stop.set(true);
for (Thread indexer : indexers) {
indexer.interrupt();
indexer.join(60000);
}
logger.info("done validating (iteration [{}])", iter);
}
logger.info("shutting down indexers");
stop.set(true);
for (Thread indexer : indexers) {
indexer.interrupt();
indexer.join(60000);
}
}

View File

@ -907,7 +907,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
* It is useful to ensure that all action on the cluster have finished and all shards that were currently relocating
* are now allocated and started.
*/
public ClusterHealthStatus ensureGreen(String... indices) {
public ClusterHealthStatus ensureGreen(String... indices) {
ClusterHealthResponse actionGet = client().admin().cluster()
.health(Requests.clusterHealthRequest(indices).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
if (actionGet.isTimedOut()) {

View File

@ -86,7 +86,7 @@ public class NetworkDelaysPartition extends NetworkPartition {
}
@Override
public TimeValue afterDisruptionTimeOut() {
return TimeValue.timeValueMillis(delayMax + super.afterDisruptionTimeOut().millis());
public TimeValue expectedTimeToHeal() {
return TimeValue.timeValueMillis(delayMax);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.transport.MockTransportService;
import java.util.Random;
@ -50,4 +51,9 @@ public class NetworkDisconnectPartition extends NetworkPartition {
transportService1.addFailToSendNoConnectRule(node2);
transportService2.addFailToSendNoConnectRule(node1);
}
@Override
public TimeValue expectedTimeToHeal() {
return TimeValue.timeValueSeconds(0);
}
}

View File

@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
@ -197,8 +196,4 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme {
transportService2.clearRule(node1);
}
@Override
public TimeValue afterDisruptionTimeOut() {
return TimeValue.timeValueSeconds(30);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.transport.MockTransportService;
import java.util.Random;
@ -49,4 +50,9 @@ public class NetworkUnresponsivePartition extends NetworkPartition {
transportService1.addUnresponsiveRule(node2);
transportService2.addUnresponsiveRule(node1);
}
@Override
public TimeValue expectedTimeToHeal() {
return TimeValue.timeValueSeconds(0);
}
}

View File

@ -60,7 +60,7 @@ public class NoOpDisruptionScheme implements ServiceDisruptionScheme {
}
@Override
public TimeValue afterDisruptionTimeOut() {
return TimeValue.timeValueSeconds(30);
public TimeValue expectedTimeToHeal() {
return TimeValue.timeValueSeconds(0);
}
}

View File

@ -37,6 +37,6 @@ public interface ServiceDisruptionScheme {
public void testClusterClosed();
public TimeValue afterDisruptionTimeOut();
public TimeValue expectedTimeToHeal();
}

View File

@ -20,7 +20,6 @@ package org.elasticsearch.test.disruption;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.InternalTestCluster;
import java.util.Random;
@ -81,8 +80,4 @@ public abstract class SingleNodeDisruption implements ServiceDisruptionScheme {
disruptedNode = null;
}
@Override
public TimeValue afterDisruptionTimeOut() {
return TimeValue.timeValueSeconds(30);
}
}

View File

@ -107,6 +107,11 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption {
return true;
}
@Override
public TimeValue expectedTimeToHeal() {
return TimeValue.timeValueSeconds(delayDurationMax + intervalBetweenDelaysMax);
}
class BackgroundWorker implements Runnable {
@Override