enable testAckedIndexing

This commit is contained in:
Boaz Leskes 2016-02-12 10:06:25 +01:00
parent 15a9da4d84
commit 74194b8f0f
1 changed files with 97 additions and 76 deletions

View File

@ -20,7 +20,6 @@
package org.elasticsearch.discovery; package org.elasticsearch.discovery;
import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
@ -147,7 +146,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
return startCluster(numberOfNodes, minimumMasterNode, null); return startCluster(numberOfNodes, minimumMasterNode, null);
} }
private List<String> startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws ExecutionException, InterruptedException { private List<String> startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws
ExecutionException, InterruptedException {
configureUnicastCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode); configureUnicastCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
List<String> nodes = internalCluster().startNodesAsync(numberOfNodes).get(); List<String> nodes = internalCluster().startNodesAsync(numberOfNodes).get();
ensureStableCluster(numberOfNodes); ensureStableCluster(numberOfNodes);
@ -176,7 +176,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
return pluginList(MockTransportService.TestPlugin.class); return pluginList(MockTransportService.TestPlugin.class);
} }
private void configureUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException { private void configureUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws
ExecutionException, InterruptedException {
if (minimumMasterNode < 0) { if (minimumMasterNode < 0) {
minimumMasterNode = numberOfNodes / 2 + 1; minimumMasterNode = numberOfNodes / 2 + 1;
} }
@ -251,7 +252,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
logger.info("--> reducing min master nodes to 2"); logger.info("--> reducing min master nodes to 2");
assertAcked(client().admin().cluster().prepareUpdateSettings() assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)).get()); .setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2))
.get());
String master = internalCluster().getMasterName(); String master = internalCluster().getMasterName();
String nonMaster = null; String nonMaster = null;
@ -378,7 +380,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
networkPartition.stopDisrupting(); networkPartition.stopDisrupting();
for (String node : nodes) { for (String node : nodes) {
ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkPartition.expectedTimeToHeal().millis()), true, node); ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkPartition.expectedTimeToHeal().millis()),
true, node);
} }
logger.info("issue a reroute"); logger.info("issue a reroute");
@ -419,9 +422,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
* <p> * <p>
* This test is a superset of tests run in the Jepsen test suite, with the exception of versioned updates * This test is a superset of tests run in the Jepsen test suite, with the exception of versioned updates
*/ */
// NOTE: if you remove the awaitFix, make sure to port the test to the 1.x branch @TestLogging("_root:DEBUG,action.index:TRACE,action.get:TRACE,discovery:TRACE,cluster.service:TRACE,"
@LuceneTestCase.AwaitsFix(bugUrl = "needs some more work to stabilize") + "indices.recovery:TRACE,indices.cluster:TRACE")
@TestLogging("_root:DEBUG,action.index:TRACE,action.get:TRACE,discovery:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testAckedIndexing() throws Exception { public void testAckedIndexing() throws Exception {
// TODO: add node count randomizaion // TODO: add node count randomizaion
final List<String> nodes = startCluster(3); final List<String> nodes = startCluster(3);
@ -453,9 +455,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
final Client client = client(node); final Client client = client(node);
final String name = "indexer_" + indexers.size(); final String name = "indexer_" + indexers.size();
final int numPrimaries = getNumShards("test").numPrimaries; final int numPrimaries = getNumShards("test").numPrimaries;
Thread thread = new Thread(new Runnable() { Thread thread = new Thread(() -> {
@Override
public void run() {
while (!stop.get()) { while (!stop.get()) {
String id = null; String id = null;
try { try {
@ -467,7 +467,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
id = Integer.toString(idGenerator.incrementAndGet()); id = Integer.toString(idGenerator.incrementAndGet());
int shard = MathUtils.mod(Murmur3HashFunction.hash(id), numPrimaries); int shard = MathUtils.mod(Murmur3HashFunction.hash(id), numPrimaries);
logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard); logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
IndexResponse response = client.prepareIndex("test", "type", id).setSource("{}").setTimeout("1s").get(); IndexResponse response = client.prepareIndex("test", "type", id).setSource("{}").setTimeout("1s").get("1s");
assertThat(response.getVersion(), equalTo(1L)); assertThat(response.getVersion(), equalTo(1L));
ackedDocs.put(id, node); ackedDocs.put(id, node);
logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node); logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node);
@ -484,7 +484,6 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
logger.info("unexpected exception in background thread of [{}]", t, node); logger.info("unexpected exception in background thread of [{}]", t, node);
} }
} }
}
}); });
thread.setName(name); thread.setName(name);
@ -512,11 +511,13 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
assertThat(semaphore.availablePermits(), equalTo(0)); assertThat(semaphore.availablePermits(), equalTo(0));
semaphore.release(docsPerIndexer); semaphore.release(docsPerIndexer);
} }
assertTrue(countDownLatchRef.get().await(60000 + disruptionScheme.expectedTimeToHeal().millis() * (docsPerIndexer * indexers.size()), TimeUnit.MILLISECONDS)); logger.info("waiting for indexing requests to complete");
assertTrue(countDownLatchRef.get().await(docsPerIndexer * 1000 + 2000, TimeUnit.MILLISECONDS));
logger.info("stopping disruption"); logger.info("stopping disruption");
disruptionScheme.stopDisrupting(); disruptionScheme.stopDisrupting();
ensureStableCluster(3, TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() + DISRUPTION_HEALING_OVERHEAD.millis())); ensureStableCluster(3, TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() +
DISRUPTION_HEALING_OVERHEAD.millis()));
ensureGreen("test"); ensureGreen("test");
logger.info("validating successful docs"); logger.info("validating successful docs");
@ -581,7 +582,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
// restore GC // restore GC
masterNodeDisruption.stopDisrupting(); masterNodeDisruption.stopDisrupting();
ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + masterNodeDisruption.expectedTimeToHeal().millis()), false, ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + masterNodeDisruption.expectedTimeToHeal().millis()),
false,
oldNonMasterNodes.get(0)); oldNonMasterNodes.get(0));
// make sure all nodes agree on master // make sure all nodes agree on master
@ -614,7 +616,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
majoritySide.remove(oldMasterNode); majoritySide.remove(oldMasterNode);
// Keeps track of the previous and current master when a master node transition took place on each node on the majority side: // Keeps track of the previous and current master when a master node transition took place on each node on the majority side:
final Map<String, List<Tuple<String, String>>> masters = Collections.synchronizedMap(new HashMap<String, List<Tuple<String, String>>>()); final Map<String, List<Tuple<String, String>>> masters = Collections.synchronizedMap(new HashMap<String, List<Tuple<String,
String>>>());
for (final String node : majoritySide) { for (final String node : majoritySide) {
masters.put(node, new ArrayList<Tuple<String, String>>()); masters.put(node, new ArrayList<Tuple<String, String>>());
internalCluster().getInstance(ClusterService.class, node).add(new ClusterStateListener() { internalCluster().getInstance(ClusterService.class, node).add(new ClusterStateListener() {
@ -623,7 +626,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
DiscoveryNode previousMaster = event.previousState().nodes().getMasterNode(); DiscoveryNode previousMaster = event.previousState().nodes().getMasterNode();
DiscoveryNode currentMaster = event.state().nodes().getMasterNode(); DiscoveryNode currentMaster = event.state().nodes().getMasterNode();
if (!Objects.equals(previousMaster, currentMaster)) { if (!Objects.equals(previousMaster, currentMaster)) {
logger.info("node {} received new cluster state: {} \n and had previous cluster state: {}", node, event.state(), event.previousState()); logger.info("node {} received new cluster state: {} \n and had previous cluster state: {}", node, event.state(),
event.previousState());
String previousMasterNodeName = previousMaster != null ? previousMaster.name() : null; String previousMasterNodeName = previousMaster != null ? previousMaster.name() : null;
String currentMasterNodeName = currentMaster != null ? currentMaster.name() : null; String currentMasterNodeName = currentMaster != null ? currentMaster.name() : null;
masters.get(node).add(new Tuple<>(previousMasterNodeName, currentMasterNodeName)); masters.get(node).add(new Tuple<>(previousMasterNodeName, currentMasterNodeName));
@ -655,7 +659,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
// but will be queued and once the old master node un-freezes it gets executed. // but will be queued and once the old master node un-freezes it gets executed.
// The old master node will send this update + the cluster state where he is flagged as master to the other // The old master node will send this update + the cluster state where he is flagged as master to the other
// nodes that follow the new master. These nodes should ignore this update. // nodes that follow the new master. These nodes should ignore this update.
internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new ClusterStateUpdateTask(Priority.IMMEDIATE) { internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new
ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
return ClusterState.builder(currentState).build(); return ClusterState.builder(currentState).build();
@ -692,11 +697,16 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
for (Map.Entry<String, List<Tuple<String, String>>> entry : masters.entrySet()) { for (Map.Entry<String, List<Tuple<String, String>>> entry : masters.entrySet()) {
String nodeName = entry.getKey(); String nodeName = entry.getKey();
List<Tuple<String, String>> recordedMasterTransition = entry.getValue(); List<Tuple<String, String>> recordedMasterTransition = entry.getValue();
assertThat("[" + nodeName + "] Each node should only record two master node transitions", recordedMasterTransition.size(), equalTo(2)); assertThat("[" + nodeName + "] Each node should only record two master node transitions", recordedMasterTransition.size(),
assertThat("[" + nodeName + "] First transition's previous master should be [null]", recordedMasterTransition.get(0).v1(), equalTo(oldMasterNode)); equalTo(2));
assertThat("[" + nodeName + "] First transition's current master should be [" + newMasterNode + "]", recordedMasterTransition.get(0).v2(), nullValue()); assertThat("[" + nodeName + "] First transition's previous master should be [null]", recordedMasterTransition.get(0).v1(),
assertThat("[" + nodeName + "] Second transition's previous master should be [null]", recordedMasterTransition.get(1).v1(), nullValue()); equalTo(oldMasterNode));
assertThat("[" + nodeName + "] Second transition's current master should be [" + newMasterNode + "]", recordedMasterTransition.get(1).v2(), equalTo(newMasterNode)); assertThat("[" + nodeName + "] First transition's current master should be [" + newMasterNode + "]", recordedMasterTransition
.get(0).v2(), nullValue());
assertThat("[" + nodeName + "] Second transition's previous master should be [null]", recordedMasterTransition.get(1).v1(),
nullValue());
assertThat("[" + nodeName + "] Second transition's current master should be [" + newMasterNode + "]",
recordedMasterTransition.get(1).v2(), equalTo(newMasterNode));
} }
} }
@ -726,7 +736,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut()); assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut());
IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value").get(); IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value")
.get();
assertThat(indexResponse.getVersion(), equalTo(1L)); assertThat(indexResponse.getVersion(), equalTo(1L));
logger.info("Verifying if document exists via node[" + notIsolatedNode + "]"); logger.info("Verifying if document exists via node[" + notIsolatedNode + "]");
@ -844,17 +855,21 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes(); DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes();
TransportService masterTranspotService = internalCluster().getInstance(TransportService.class, discoveryNodes.masterNode().getName()); TransportService masterTranspotService = internalCluster().getInstance(TransportService.class, discoveryNodes.masterNode()
.getName());
logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode); logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode);
MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nonMasterNode); MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
nonMasterNode);
nonMasterTransportService.addFailToSendNoConnectRule(masterTranspotService); nonMasterTransportService.addFailToSendNoConnectRule(masterTranspotService);
assertNoMaster(nonMasterNode); assertNoMaster(nonMasterNode);
logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode); logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode);
MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode); MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
TransportService localTransportService = internalCluster().getInstance(TransportService.class, discoveryNodes.localNode().getName()); masterNode);
TransportService localTransportService = internalCluster().getInstance(TransportService.class, discoveryNodes.localNode().getName
());
if (randomBoolean()) { if (randomBoolean()) {
masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.SEND_ACTION_NAME); masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.SEND_ACTION_NAME);
} else { } else {
@ -863,9 +878,11 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode); logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode);
final CountDownLatch countDownLatch = new CountDownLatch(2); final CountDownLatch countDownLatch = new CountDownLatch(2);
nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService.original()) { nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService
.original()) {
@Override @Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions
options) throws IOException, TransportException {
if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) { if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) {
countDownLatch.countDown(); countDownLatch.countDown();
} }
@ -911,7 +928,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
NetworkPartition networkPartition = addRandomIsolation(isolatedNode); NetworkPartition networkPartition = addRandomIsolation(isolatedNode);
networkPartition.startDisrupting(); networkPartition.startDisrupting();
service.shardFailed(failedShard, failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new ShardStateAction.Listener() { service.shardFailed(failedShard, failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new
ShardStateAction.Listener() {
@Override @Override
public void onSuccess() { public void onSuccess() {
success.set(true); success.set(true);
@ -988,7 +1006,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
} }
logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode); logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode);
MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode); MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
masterNode);
if (randomBoolean()) { if (randomBoolean()) {
masterTransportService.addUnresponsiveRule(internalCluster().getInstance(TransportService.class, nonMasterNode)); masterTransportService.addUnresponsiveRule(internalCluster().getInstance(TransportService.class, nonMasterNode));
} else { } else {
@ -1039,7 +1058,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
MockTransportService transportServiceNode2 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_2); MockTransportService transportServiceNode2 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_2);
CountDownLatch beginRelocationLatch = new CountDownLatch(1); CountDownLatch beginRelocationLatch = new CountDownLatch(1);
CountDownLatch endRelocationLatch = new CountDownLatch(1); CountDownLatch endRelocationLatch = new CountDownLatch(1);
transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatch, endRelocationLatch)); transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatch,
endRelocationLatch));
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node_1, node_2)).get(); internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node_1, node_2)).get();
// wait for relocation to start // wait for relocation to start
beginRelocationLatch.await(); beginRelocationLatch.await();
@ -1076,7 +1096,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/11665") @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/11665")
public void testIndicesDeleted() throws Exception { public void testIndicesDeleted() throws Exception {
configureUnicastCluster(3, null, 2); configureUnicastCluster(3, null, 2);
InternalTestCluster.Async<List<String>> masterNodes= internalCluster().startMasterOnlyNodesAsync(2); InternalTestCluster.Async<List<String>> masterNodes = internalCluster().startMasterOnlyNodesAsync(2);
InternalTestCluster.Async<String> dataNode = internalCluster().startDataOnlyNodeAsync(); InternalTestCluster.Async<String> dataNode = internalCluster().startDataOnlyNodeAsync();
dataNode.get(); dataNode.get();
masterNodes.get(); masterNodes.get();
@ -1158,7 +1178,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
assertNull("node [" + node + "] still has [" + state.nodes().masterNode() + "] as master", state.nodes().masterNode()); assertNull("node [" + node + "] still has [" + state.nodes().masterNode() + "] as master", state.nodes().masterNode());
if (expectedBlocks != null) { if (expectedBlocks != null) {
for (ClusterBlockLevel level : expectedBlocks.levels()) { for (ClusterBlockLevel level : expectedBlocks.levels()) {
assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock(level)); assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock
(level));
} }
} }
} }