Make mock repository blocking tests more deterministic and reproducible
This commit is contained in:
parent
904c0abb3e
commit
c3e53f3889
|
@ -31,7 +31,6 @@ import org.junit.Before;
|
|||
import org.junit.Ignore;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
|
@ -101,24 +100,17 @@ public abstract class AbstractSnapshotTests extends ElasticsearchIntegrationTest
|
|||
});
|
||||
}
|
||||
|
||||
public String waitForCompletionOrBlock(Collection<String> nodes, String repository, String snapshot, TimeValue timeout) throws InterruptedException {
|
||||
public void waitForBlock(String node, String repository, TimeValue timeout) throws InterruptedException {
|
||||
long start = System.currentTimeMillis();
|
||||
RepositoriesService repositoriesService = cluster().getInstance(RepositoriesService.class, node);
|
||||
MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
|
||||
while (System.currentTimeMillis() - start < timeout.millis()) {
|
||||
ImmutableList<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshot).get().getSnapshots();
|
||||
assertThat(snapshotInfos.size(), equalTo(1));
|
||||
if (snapshotInfos.get(0).state().completed()) {
|
||||
return null;
|
||||
}
|
||||
for (String node : nodes) {
|
||||
RepositoriesService repositoriesService = cluster().getInstance(RepositoriesService.class, node);
|
||||
if (((MockRepository) repositoriesService.repository(repository)).blocked()) {
|
||||
return node;
|
||||
}
|
||||
if (mockRepository.blocked()) {
|
||||
return;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
fail("Timeout!!!");
|
||||
return null;
|
||||
}
|
||||
|
||||
public SnapshotInfo waitForCompletion(String repository, String snapshot, TimeValue timeout) throws InterruptedException {
|
||||
|
@ -135,9 +127,16 @@ public abstract class AbstractSnapshotTests extends ElasticsearchIntegrationTest
|
|||
return null;
|
||||
}
|
||||
|
||||
public static void unblock(String repository) {
|
||||
for (RepositoriesService repositoriesService : cluster().getInstances(RepositoriesService.class)) {
|
||||
((MockRepository) repositoriesService.repository(repository)).unblock();
|
||||
public static String blockNodeWithIndex(String index) {
|
||||
for(String node : cluster().nodesInclude("test-idx")) {
|
||||
((MockRepository)cluster().getInstance(RepositoriesService.class, node).repository("test-repo")).blockOnDataFiles(true);
|
||||
return node;
|
||||
}
|
||||
fail("No nodes for the index " + index + " found");
|
||||
return null;
|
||||
}
|
||||
|
||||
public static void unblockNode(String node) {
|
||||
((MockRepository)cluster().getInstance(RepositoriesService.class, node).repository("test-repo")).unblock();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.snapshots;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.LifecycleScope;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
|
@ -106,31 +105,27 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
|
|||
ImmutableSettings.settingsBuilder()
|
||||
.put("location", newTempDir(LifecycleScope.TEST))
|
||||
.put("random", randomAsciiOfLength(10))
|
||||
.put("random_data_file_blocking_rate", 0.1)
|
||||
.put("wait_after_unblock", 200)
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
// Pick one node and block it
|
||||
String blockedNode = blockNodeWithIndex("test-idx");
|
||||
|
||||
logger.info("--> snapshot");
|
||||
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
|
||||
|
||||
String blockedNode = waitForCompletionOrBlock(nodes, "test-repo", "test-snap", TimeValue.timeValueSeconds(60));
|
||||
if (blockedNode != null) {
|
||||
logger.info("--> execution was blocked on node [{}], shutting it down", blockedNode);
|
||||
unblock("test-repo");
|
||||
logger.info("--> stopping node", blockedNode);
|
||||
stopNode(blockedNode);
|
||||
logger.info("--> waiting for completion");
|
||||
SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(60));
|
||||
logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size());
|
||||
logger.info("--> done");
|
||||
} else {
|
||||
logger.info("--> done without blocks");
|
||||
ImmutableList<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots();
|
||||
assertThat(snapshotInfos.size(), equalTo(1));
|
||||
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
assertThat(snapshotInfos.get(0).shardFailures().size(), equalTo(0));
|
||||
logger.info("--> waiting for block to kick in");
|
||||
waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));
|
||||
|
||||
}
|
||||
logger.info("--> execution was blocked on node [{}], shutting it down", blockedNode);
|
||||
unblockNode(blockedNode);
|
||||
|
||||
logger.info("--> stopping node", blockedNode);
|
||||
stopNode(blockedNode);
|
||||
logger.info("--> waiting for completion");
|
||||
SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(60));
|
||||
logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size());
|
||||
logger.info("--> done");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -665,7 +665,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
|
|||
|
||||
@Test
|
||||
@TestLogging("cluster.routing.allocation.decider:TRACE")
|
||||
// @LuceneTestCase.AwaitsFix(bugUrl="imotov is working on the fix")
|
||||
public void moveShardWhileSnapshottingTest() throws Exception {
|
||||
Client client = client();
|
||||
File repositoryLocation = newTempDir(LifecycleScope.TEST);
|
||||
|
@ -675,7 +674,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
|
|||
ImmutableSettings.settingsBuilder()
|
||||
.put("location", repositoryLocation)
|
||||
.put("random", randomAsciiOfLength(10))
|
||||
.put("random_data_file_blocking_rate", 0.1)
|
||||
.put("wait_after_unblock", 200)
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
@ -690,23 +688,28 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
|
|||
refresh();
|
||||
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
|
||||
|
||||
// Pick one node and block it
|
||||
String blockedNode = blockNodeWithIndex("test-idx");
|
||||
|
||||
logger.info("--> snapshot");
|
||||
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
|
||||
String blockedNode = waitForCompletionOrBlock(cluster().nodesInclude("test-idx"), "test-repo", "test-snap", TimeValue.timeValueSeconds(60));
|
||||
if (blockedNode != null) {
|
||||
logger.info("--> move shards away from the node");
|
||||
ImmutableSettings.Builder excludeSettings = ImmutableSettings.builder().put("index.routing.allocation.exclude._name", blockedNode);
|
||||
client().admin().indices().prepareUpdateSettings("test-idx").setSettings(excludeSettings).get();
|
||||
logger.info("--> execution was blocked on node [{}], moving shards away from this node", blockedNode);
|
||||
unblock("test-repo");
|
||||
logger.info("--> waiting for completion");
|
||||
SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(60));
|
||||
logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size());
|
||||
logger.info("--> done");
|
||||
} else {
|
||||
logger.info("--> done without blocks");
|
||||
}
|
||||
|
||||
logger.info("--> waiting for block to kick in");
|
||||
waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));
|
||||
|
||||
logger.info("--> execution was blocked on node [{}], moving shards away from this node", blockedNode);
|
||||
ImmutableSettings.Builder excludeSettings = ImmutableSettings.builder().put("index.routing.allocation.exclude._name", blockedNode);
|
||||
client().admin().indices().prepareUpdateSettings("test-idx").setSettings(excludeSettings).get();
|
||||
|
||||
logger.info("--> unblocking blocked node");
|
||||
unblockNode(blockedNode);
|
||||
logger.info("--> waiting for completion");
|
||||
SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600));
|
||||
logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size());
|
||||
logger.info("--> done");
|
||||
|
||||
ImmutableList<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots();
|
||||
|
||||
assertThat(snapshotInfos.size(), equalTo(1));
|
||||
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
assertThat(snapshotInfos.get(0).shardFailures().size(), equalTo(0));
|
||||
|
|
|
@ -46,10 +46,6 @@ public class MockRepository extends FsRepository {
|
|||
|
||||
private final AtomicLong failureCounter = new AtomicLong();
|
||||
|
||||
private volatile boolean blockable = true;
|
||||
|
||||
private volatile boolean blocked = false;
|
||||
|
||||
public void resetFailureCount() {
|
||||
failureCounter.set(0);
|
||||
}
|
||||
|
@ -62,23 +58,25 @@ public class MockRepository extends FsRepository {
|
|||
|
||||
private final double randomDataFileIOExceptionRate;
|
||||
|
||||
private final double randomControlBlockingRate;
|
||||
|
||||
private final double randomDataFileBlockingRate;
|
||||
|
||||
private final long waitAfterUnblock;
|
||||
|
||||
private final MockBlobStore mockBlobStore;
|
||||
|
||||
private final String randomPrefix;
|
||||
|
||||
private volatile boolean blockOnControlFiles;
|
||||
|
||||
private volatile boolean blockOnDataFiles;
|
||||
|
||||
private volatile boolean blocked = false;
|
||||
|
||||
@Inject
|
||||
public MockRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
|
||||
super(name, repositorySettings, indexShardRepository);
|
||||
randomControlIOExceptionRate = repositorySettings.settings().getAsDouble("random_control_io_exception_rate", 0.0);
|
||||
randomDataFileIOExceptionRate = repositorySettings.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
|
||||
randomControlBlockingRate = repositorySettings.settings().getAsDouble("random_control_blocking_rate", 0.0);
|
||||
randomDataFileBlockingRate = repositorySettings.settings().getAsDouble("random_data_file_blocking_rate", 0.0);
|
||||
blockOnControlFiles = repositorySettings.settings().getAsBoolean("block_on_control", false);
|
||||
blockOnDataFiles = repositorySettings.settings().getAsBoolean("block_on_data", false);
|
||||
randomPrefix = repositorySettings.settings().get("random");
|
||||
waitAfterUnblock = repositorySettings.settings().getAsLong("wait_after_unblock", 0L);
|
||||
logger.info("starting mock repository with random prefix " + randomPrefix);
|
||||
|
@ -108,6 +106,14 @@ public class MockRepository extends FsRepository {
|
|||
mockBlobStore.unblockExecution();
|
||||
}
|
||||
|
||||
public void blockOnDataFiles(boolean blocked) {
|
||||
blockOnDataFiles = blocked;
|
||||
}
|
||||
|
||||
public void blockOnControlFiles(boolean blocked) {
|
||||
blockOnControlFiles = blocked;
|
||||
}
|
||||
|
||||
public class MockBlobStore extends BlobStoreWrapper {
|
||||
ConcurrentMap<String, AtomicLong> accessCounts = new ConcurrentHashMap<String, AtomicLong>();
|
||||
|
||||
|
@ -133,8 +139,10 @@ public class MockRepository extends FsRepository {
|
|||
|
||||
public synchronized void unblockExecution() {
|
||||
if (blocked) {
|
||||
blockable = false;
|
||||
blocked = false;
|
||||
// Clean blocking flags, so we wouldn't try to block again
|
||||
blockOnDataFiles = false;
|
||||
blockOnControlFiles = false;
|
||||
this.notifyAll();
|
||||
}
|
||||
}
|
||||
|
@ -146,12 +154,13 @@ public class MockRepository extends FsRepository {
|
|||
private synchronized boolean blockExecution() {
|
||||
boolean wasBlocked = false;
|
||||
try {
|
||||
while (blockable) {
|
||||
while (blockOnDataFiles || blockOnControlFiles) {
|
||||
blocked = true;
|
||||
this.wait();
|
||||
wasBlocked = true;
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
return wasBlocked;
|
||||
}
|
||||
|
@ -190,7 +199,7 @@ public class MockRepository extends FsRepository {
|
|||
logger.info("throwing random IOException for file [{}] at path [{}]", blobName, path());
|
||||
addFailure();
|
||||
throw new IOException("Random IOException");
|
||||
} else if (shouldFail(blobName, randomDataFileBlockingRate)) {
|
||||
} else if (blockOnDataFiles) {
|
||||
logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path());
|
||||
if (blockExecution() && waitAfterUnblock > 0) {
|
||||
try {
|
||||
|
@ -207,7 +216,7 @@ public class MockRepository extends FsRepository {
|
|||
logger.info("throwing random IOException for file [{}] at path [{}]", blobName, path());
|
||||
addFailure();
|
||||
throw new IOException("Random IOException");
|
||||
} else if (shouldFail(blobName, randomControlBlockingRate)) {
|
||||
} else if (blockOnControlFiles) {
|
||||
logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path());
|
||||
if (blockExecution() && waitAfterUnblock > 0) {
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue