From c3e53f38899cd39e9f8a3b71829c6accb8b2240d Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Tue, 12 Nov 2013 13:29:51 -0500 Subject: [PATCH] Make mock repository blocking tests more deterministic and reproducible --- .../snapshots/AbstractSnapshotTests.java | 31 ++++++++-------- .../DedicatedClusterSnapshotRestoreTests.java | 33 +++++++---------- .../SharedClusterSnapshotRestoreTests.java | 35 ++++++++++-------- .../snapshots/mockstore/MockRepository.java | 37 ++++++++++++------- 4 files changed, 71 insertions(+), 65 deletions(-) diff --git a/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotTests.java b/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotTests.java index b6666d274ea..d1081fd4c18 100644 --- a/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotTests.java +++ b/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotTests.java @@ -31,7 +31,6 @@ import org.junit.Before; import org.junit.Ignore; import java.io.File; -import java.util.Collection; import static org.hamcrest.Matchers.equalTo; @@ -101,24 +100,17 @@ public abstract class AbstractSnapshotTests extends ElasticsearchIntegrationTest }); } - public String waitForCompletionOrBlock(Collection nodes, String repository, String snapshot, TimeValue timeout) throws InterruptedException { + public void waitForBlock(String node, String repository, TimeValue timeout) throws InterruptedException { long start = System.currentTimeMillis(); + RepositoriesService repositoriesService = cluster().getInstance(RepositoriesService.class, node); + MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository); while (System.currentTimeMillis() - start < timeout.millis()) { - ImmutableList snapshotInfos = client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshot).get().getSnapshots(); - assertThat(snapshotInfos.size(), equalTo(1)); - if (snapshotInfos.get(0).state().completed()) { - return null; - } - for (String node : nodes) { - RepositoriesService repositoriesService = cluster().getInstance(RepositoriesService.class, node); - if (((MockRepository) repositoriesService.repository(repository)).blocked()) { - return node; - } + if (mockRepository.blocked()) { + return; } Thread.sleep(100); } fail("Timeout!!!"); - return null; } public SnapshotInfo waitForCompletion(String repository, String snapshot, TimeValue timeout) throws InterruptedException { @@ -135,9 +127,16 @@ public abstract class AbstractSnapshotTests extends ElasticsearchIntegrationTest return null; } - public static void unblock(String repository) { - for (RepositoriesService repositoriesService : cluster().getInstances(RepositoriesService.class)) { - ((MockRepository) repositoriesService.repository(repository)).unblock(); + public static String blockNodeWithIndex(String index) { + for(String node : cluster().nodesInclude("test-idx")) { + ((MockRepository)cluster().getInstance(RepositoriesService.class, node).repository("test-repo")).blockOnDataFiles(true); + return node; } + fail("No nodes for the index " + index + " found"); + return null; + } + + public static void unblockNode(String node) { + ((MockRepository)cluster().getInstance(RepositoriesService.class, node).repository("test-repo")).unblock(); } } diff --git a/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java index fd5a5a0bd26..ecf6c2affef 100644 --- a/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.snapshots; import com.carrotsearch.randomizedtesting.LifecycleScope; -import com.google.common.collect.ImmutableList; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; @@ -106,31 +105,27 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests ImmutableSettings.settingsBuilder() .put("location", newTempDir(LifecycleScope.TEST)) .put("random", randomAsciiOfLength(10)) - .put("random_data_file_blocking_rate", 0.1) .put("wait_after_unblock", 200) ).get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + // Pick one node and block it + String blockedNode = blockNodeWithIndex("test-idx"); + logger.info("--> snapshot"); client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); - String blockedNode = waitForCompletionOrBlock(nodes, "test-repo", "test-snap", TimeValue.timeValueSeconds(60)); - if (blockedNode != null) { - logger.info("--> execution was blocked on node [{}], shutting it down", blockedNode); - unblock("test-repo"); - logger.info("--> stopping node", blockedNode); - stopNode(blockedNode); - logger.info("--> waiting for completion"); - SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(60)); - logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size()); - logger.info("--> done"); - } else { - logger.info("--> done without blocks"); - ImmutableList snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots(); - assertThat(snapshotInfos.size(), equalTo(1)); - assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS)); - assertThat(snapshotInfos.get(0).shardFailures().size(), equalTo(0)); + logger.info("--> waiting for block to kick in"); + waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); - } + logger.info("--> execution was blocked on node [{}], shutting it down", blockedNode); + unblockNode(blockedNode); + + logger.info("--> stopping node", blockedNode); + stopNode(blockedNode); + logger.info("--> waiting for completion"); + SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(60)); + logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size()); + logger.info("--> done"); } } diff --git a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java index 361af8d9ac1..5fd3f8e1301 100644 --- a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java @@ -665,7 +665,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { @Test @TestLogging("cluster.routing.allocation.decider:TRACE") -// @LuceneTestCase.AwaitsFix(bugUrl="imotov is working on the fix") public void moveShardWhileSnapshottingTest() throws Exception { Client client = client(); File repositoryLocation = newTempDir(LifecycleScope.TEST); @@ -675,7 +674,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { ImmutableSettings.settingsBuilder() .put("location", repositoryLocation) .put("random", randomAsciiOfLength(10)) - .put("random_data_file_blocking_rate", 0.1) .put("wait_after_unblock", 200) ).get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); @@ -690,23 +688,28 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { refresh(); assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L)); + // Pick one node and block it + String blockedNode = blockNodeWithIndex("test-idx"); + logger.info("--> snapshot"); client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); - String blockedNode = waitForCompletionOrBlock(cluster().nodesInclude("test-idx"), "test-repo", "test-snap", TimeValue.timeValueSeconds(60)); - if (blockedNode != null) { - logger.info("--> move shards away from the node"); - ImmutableSettings.Builder excludeSettings = ImmutableSettings.builder().put("index.routing.allocation.exclude._name", blockedNode); - client().admin().indices().prepareUpdateSettings("test-idx").setSettings(excludeSettings).get(); - logger.info("--> execution was blocked on node [{}], moving shards away from this node", blockedNode); - unblock("test-repo"); - logger.info("--> waiting for completion"); - SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(60)); - logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size()); - logger.info("--> done"); - } else { - logger.info("--> done without blocks"); - } + + logger.info("--> waiting for block to kick in"); + waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); + + logger.info("--> execution was blocked on node [{}], moving shards away from this node", blockedNode); + ImmutableSettings.Builder excludeSettings = ImmutableSettings.builder().put("index.routing.allocation.exclude._name", blockedNode); + client().admin().indices().prepareUpdateSettings("test-idx").setSettings(excludeSettings).get(); + + logger.info("--> unblocking blocked node"); + unblockNode(blockedNode); + logger.info("--> waiting for completion"); + SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)); + logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size()); + logger.info("--> done"); + ImmutableList snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots(); + assertThat(snapshotInfos.size(), equalTo(1)); assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS)); assertThat(snapshotInfos.get(0).shardFailures().size(), equalTo(0)); diff --git a/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 5ebe185151a..39f5842a132 100644 --- a/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -46,10 +46,6 @@ public class MockRepository extends FsRepository { private final AtomicLong failureCounter = new AtomicLong(); - private volatile boolean blockable = true; - - private volatile boolean blocked = false; - public void resetFailureCount() { failureCounter.set(0); } @@ -62,23 +58,25 @@ public class MockRepository extends FsRepository { private final double randomDataFileIOExceptionRate; - private final double randomControlBlockingRate; - - private final double randomDataFileBlockingRate; - private final long waitAfterUnblock; private final MockBlobStore mockBlobStore; private final String randomPrefix; + private volatile boolean blockOnControlFiles; + + private volatile boolean blockOnDataFiles; + + private volatile boolean blocked = false; + @Inject public MockRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException { super(name, repositorySettings, indexShardRepository); randomControlIOExceptionRate = repositorySettings.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = repositorySettings.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); - randomControlBlockingRate = repositorySettings.settings().getAsDouble("random_control_blocking_rate", 0.0); - randomDataFileBlockingRate = repositorySettings.settings().getAsDouble("random_data_file_blocking_rate", 0.0); + blockOnControlFiles = repositorySettings.settings().getAsBoolean("block_on_control", false); + blockOnDataFiles = repositorySettings.settings().getAsBoolean("block_on_data", false); randomPrefix = repositorySettings.settings().get("random"); waitAfterUnblock = repositorySettings.settings().getAsLong("wait_after_unblock", 0L); logger.info("starting mock repository with random prefix " + randomPrefix); @@ -108,6 +106,14 @@ public class MockRepository extends FsRepository { mockBlobStore.unblockExecution(); } + public void blockOnDataFiles(boolean blocked) { + blockOnDataFiles = blocked; + } + + public void blockOnControlFiles(boolean blocked) { + blockOnControlFiles = blocked; + } + public class MockBlobStore extends BlobStoreWrapper { ConcurrentMap accessCounts = new ConcurrentHashMap(); @@ -133,8 +139,10 @@ public class MockRepository extends FsRepository { public synchronized void unblockExecution() { if (blocked) { - blockable = false; blocked = false; + // Clean blocking flags, so we wouldn't try to block again + blockOnDataFiles = false; + blockOnControlFiles = false; this.notifyAll(); } } @@ -146,12 +154,13 @@ public class MockRepository extends FsRepository { private synchronized boolean blockExecution() { boolean wasBlocked = false; try { - while (blockable) { + while (blockOnDataFiles || blockOnControlFiles) { blocked = true; this.wait(); wasBlocked = true; } } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); } return wasBlocked; } @@ -190,7 +199,7 @@ public class MockRepository extends FsRepository { logger.info("throwing random IOException for file [{}] at path [{}]", blobName, path()); addFailure(); throw new IOException("Random IOException"); - } else if (shouldFail(blobName, randomDataFileBlockingRate)) { + } else if (blockOnDataFiles) { logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path()); if (blockExecution() && waitAfterUnblock > 0) { try { @@ -207,7 +216,7 @@ public class MockRepository extends FsRepository { logger.info("throwing random IOException for file [{}] at path [{}]", blobName, path()); addFailure(); throw new IOException("Random IOException"); - } else if (shouldFail(blobName, randomControlBlockingRate)) { + } else if (blockOnControlFiles) { logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path()); if (blockExecution() && waitAfterUnblock > 0) { try {