remove tests. they are fragile, hard to debug and add no value
closes #13542 closes #13343 closes #13316
This commit is contained in:
parent
ac2be641b3
commit
997ea08e7b
|
@ -21,30 +21,22 @@ package org.elasticsearch.discovery;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
|
||||||
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
|
|
||||||
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
|
|
||||||
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
|
||||||
import org.elasticsearch.action.count.CountResponse;
|
|
||||||
import org.elasticsearch.action.get.GetResponse;
|
import org.elasticsearch.action.get.GetResponse;
|
||||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||||
import org.elasticsearch.action.index.IndexResponse;
|
import org.elasticsearch.action.index.IndexResponse;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.cluster.*;
|
import org.elasticsearch.cluster.*;
|
||||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
|
||||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.routing.DjbHashFunction;
|
import org.elasticsearch.cluster.routing.DjbHashFunction;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
|
||||||
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.logging.ESLogger;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||||
|
@ -55,9 +47,7 @@ import org.elasticsearch.discovery.zen.ping.ZenPing;
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPingService;
|
import org.elasticsearch.discovery.zen.ping.ZenPingService;
|
||||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
|
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
|
||||||
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
|
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.indices.recovery.RecoverySource;
|
|
||||||
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
|
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
@ -938,234 +928,6 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
||||||
ensureStableCluster(3);
|
ensureStableCluster(3);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Tests a visibility issue if a shard is in POST_RECOVERY
|
|
||||||
*
|
|
||||||
* When a user indexes a document, then refreshes and then a executes a search and all are successful and no timeouts etc then
|
|
||||||
* the document must be visible for the search.
|
|
||||||
*
|
|
||||||
* When a primary is relocating from node_1 to node_2, there can be a short time where both old and new primary
|
|
||||||
* are started and accept indexing and read requests. However, the new primary might not be visible to nodes
|
|
||||||
* that lag behind one cluster state. If such a node then sends a refresh to the index, this refresh request
|
|
||||||
* must reach the new primary on node_2 too. Otherwise a different node that searches on the new primary might not
|
|
||||||
* find the indexed document although a refresh was executed before.
|
|
||||||
*
|
|
||||||
* In detail:
|
|
||||||
* Cluster state 0:
|
|
||||||
* node_1: [index][0] STARTED (ShardRoutingState)
|
|
||||||
* node_2: no shard
|
|
||||||
*
|
|
||||||
* 0. primary ([index][0]) relocates from node_1 to node_2
|
|
||||||
* Cluster state 1:
|
|
||||||
* node_1: [index][0] RELOCATING (ShardRoutingState), (STARTED from IndexShardState perspective on node_1)
|
|
||||||
* node_2: [index][0] INITIALIZING (ShardRoutingState), (IndexShardState on node_2 is RECOVERING)
|
|
||||||
*
|
|
||||||
* 1. node_2 is done recovering, moves its shard to IndexShardState.POST_RECOVERY and sends a message to master that the shard is ShardRoutingState.STARTED
|
|
||||||
* Cluster state is still the same but the IndexShardState on node_2 has changed and it now accepts writes and reads:
|
|
||||||
* node_1: [index][0] RELOCATING (ShardRoutingState), (STARTED from IndexShardState perspective on node_1)
|
|
||||||
* node_2: [index][0] INITIALIZING (ShardRoutingState), (IndexShardState on node_2 is POST_RECOVERY)
|
|
||||||
*
|
|
||||||
* 2. any node receives an index request which is then executed on node_1 and node_2
|
|
||||||
*
|
|
||||||
* 3. node_3 sends a refresh but it is a little behind with cluster state processing and still on cluster state 0.
|
|
||||||
* If refresh was a broadcast operation it send it to node_1 only because it does not know node_2 has a shard too
|
|
||||||
*
|
|
||||||
* 4. node_3 catches up with the cluster state and acks it to master which now can process the shard started message
|
|
||||||
* from node_2 before and updates cluster state to:
|
|
||||||
* Cluster state 2:
|
|
||||||
* node_1: [index][0] no shard
|
|
||||||
* node_2: [index][0] STARTED (ShardRoutingState), (IndexShardState on node_2 is still POST_RECOVERY)
|
|
||||||
*
|
|
||||||
* master sends this to all nodes.
|
|
||||||
*
|
|
||||||
* 5. node_4 and node_3 process cluster state 2, but node_1 and node_2 have not yet
|
|
||||||
*
|
|
||||||
* If now node_4 searches for document that was indexed before, it will search at node_2 because it is on
|
|
||||||
* cluster state 2. It should be able to retrieve it with a search because the refresh from before was
|
|
||||||
* successful.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/13316")
|
|
||||||
public void testReadOnPostRecoveryShards() throws Exception {
|
|
||||||
List<BlockClusterStateProcessing> clusterStateBlocks = new ArrayList<>();
|
|
||||||
try {
|
|
||||||
configureUnicastCluster(5, null, 1);
|
|
||||||
// we could probably write a test without a dedicated master node but it is easier if we use one
|
|
||||||
InternalTestCluster.Async<String> masterNodeFuture = internalCluster().startMasterOnlyNodeAsync();
|
|
||||||
// node_1 will have the shard in the beginning
|
|
||||||
InternalTestCluster.Async<String> node1Future = internalCluster().startDataOnlyNodeAsync();
|
|
||||||
final String masterNode = masterNodeFuture.get();
|
|
||||||
final String node_1 = node1Future.get();
|
|
||||||
logger.info("--> creating index [test] with one shard and zero replica");
|
|
||||||
assertAcked(prepareCreate("test").setSettings(
|
|
||||||
Settings.builder().put(indexSettings())
|
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
|
||||||
.put(IndexShard.INDEX_REFRESH_INTERVAL, -1))
|
|
||||||
.addMapping("doc", jsonBuilder().startObject().startObject("doc")
|
|
||||||
.startObject("properties").startObject("text").field("type", "string").endObject().endObject()
|
|
||||||
.endObject().endObject())
|
|
||||||
);
|
|
||||||
ensureGreen("test");
|
|
||||||
logger.info("--> starting three more data nodes");
|
|
||||||
List<String> nodeNamesFuture = internalCluster().startDataOnlyNodesAsync(3).get();
|
|
||||||
final String node_2 = nodeNamesFuture.get(0);
|
|
||||||
final String node_3 = nodeNamesFuture.get(1);
|
|
||||||
final String node_4 = nodeNamesFuture.get(2);
|
|
||||||
logger.info("--> running cluster_health");
|
|
||||||
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
|
|
||||||
.setWaitForNodes("5")
|
|
||||||
.setWaitForRelocatingShards(0)
|
|
||||||
.get();
|
|
||||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
|
||||||
|
|
||||||
logger.info("--> move shard from node_1 to node_2, and wait for relocation to finish");
|
|
||||||
|
|
||||||
// block cluster state updates on node_3 so that it only sees the shard on node_1
|
|
||||||
BlockClusterStateProcessing disruptionNode3 = new BlockClusterStateProcessing(node_3, getRandom());
|
|
||||||
clusterStateBlocks.add(disruptionNode3);
|
|
||||||
internalCluster().setDisruptionScheme(disruptionNode3);
|
|
||||||
disruptionNode3.startDisrupting();
|
|
||||||
// register a Tracer that notifies begin and end of a relocation
|
|
||||||
MockTransportService transportServiceNode2 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_2);
|
|
||||||
CountDownLatch beginRelocationLatchNode2 = new CountDownLatch(1);
|
|
||||||
CountDownLatch endRelocationLatchNode2 = new CountDownLatch(1);
|
|
||||||
transportServiceNode2.addTracer(new StartRecoveryToShardStaredTracer(logger, beginRelocationLatchNode2, endRelocationLatchNode2));
|
|
||||||
|
|
||||||
// block cluster state updates on node_1 and node_2 so that we end up with two primaries
|
|
||||||
BlockClusterStateProcessing disruptionNode2 = new BlockClusterStateProcessing(node_2, getRandom());
|
|
||||||
clusterStateBlocks.add(disruptionNode2);
|
|
||||||
disruptionNode2.applyToCluster(internalCluster());
|
|
||||||
BlockClusterStateProcessing disruptionNode1 = new BlockClusterStateProcessing(node_1, getRandom());
|
|
||||||
clusterStateBlocks.add(disruptionNode1);
|
|
||||||
disruptionNode1.applyToCluster(internalCluster());
|
|
||||||
|
|
||||||
logger.info("--> move shard from node_1 to node_2");
|
|
||||||
// don't block on the relocation. cluster state updates are blocked on node_3 and the relocation would timeout
|
|
||||||
Future<ClusterRerouteResponse> rerouteFuture = internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_2)).setTimeout(new TimeValue(1000, TimeUnit.MILLISECONDS)).execute();
|
|
||||||
|
|
||||||
logger.info("--> wait for relocation to start");
|
|
||||||
// wait for relocation to start
|
|
||||||
beginRelocationLatchNode2.await();
|
|
||||||
// start to block cluster state updates on node_1 and node_2 so that we end up with two primaries
|
|
||||||
// one STARTED on node_1 and one in POST_RECOVERY on node_2
|
|
||||||
disruptionNode1.startDisrupting();
|
|
||||||
disruptionNode2.startDisrupting();
|
|
||||||
endRelocationLatchNode2.await();
|
|
||||||
final Client node3Client = internalCluster().client(node_3);
|
|
||||||
final Client node2Client = internalCluster().client(node_2);
|
|
||||||
final Client node1Client = internalCluster().client(node_1);
|
|
||||||
final Client node4Client = internalCluster().client(node_4);
|
|
||||||
logger.info("--> index doc");
|
|
||||||
logLocalClusterStates(node1Client, node2Client, node3Client, node4Client);
|
|
||||||
assertTrue(node3Client.prepareIndex("test", "doc").setSource("{\"text\":\"a\"}").get().isCreated());
|
|
||||||
//sometimes refresh and sometimes flush
|
|
||||||
int refreshOrFlushType = randomIntBetween(1, 2);
|
|
||||||
switch (refreshOrFlushType) {
|
|
||||||
case 1: {
|
|
||||||
logger.info("--> refresh from node_3");
|
|
||||||
RefreshResponse refreshResponse = node3Client.admin().indices().prepareRefresh().get();
|
|
||||||
assertThat(refreshResponse.getFailedShards(), equalTo(0));
|
|
||||||
// the total shards is num replicas + 1 so that can be lower here because one shard
|
|
||||||
// is relocating and counts twice as successful
|
|
||||||
assertThat(refreshResponse.getTotalShards(), equalTo(2));
|
|
||||||
assertThat(refreshResponse.getSuccessfulShards(), equalTo(2));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case 2: {
|
|
||||||
logger.info("--> flush from node_3");
|
|
||||||
FlushResponse flushResponse = node3Client.admin().indices().prepareFlush().get();
|
|
||||||
assertThat(flushResponse.getFailedShards(), equalTo(0));
|
|
||||||
// the total shards is num replicas + 1 so that can be lower here because one shard
|
|
||||||
// is relocating and counts twice as successful
|
|
||||||
assertThat(flushResponse.getTotalShards(), equalTo(2));
|
|
||||||
assertThat(flushResponse.getSuccessfulShards(), equalTo(2));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
fail("this is test bug, number should be between 1 and 2");
|
|
||||||
}
|
|
||||||
// now stop disrupting so that node_3 can ack last cluster state to master and master can continue
|
|
||||||
// to publish the next cluster state
|
|
||||||
logger.info("--> stop disrupting node_3");
|
|
||||||
disruptionNode3.stopDisrupting();
|
|
||||||
rerouteFuture.get();
|
|
||||||
logger.info("--> wait for node_4 to get new cluster state");
|
|
||||||
// wait until node_4 actually has the new cluster state in which node_1 has no shard
|
|
||||||
assertBusy(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
ClusterState clusterState = node4Client.admin().cluster().prepareState().setLocal(true).get().getState();
|
|
||||||
// get the node id from the name. TODO: Is there a better way to do this?
|
|
||||||
String nodeId = null;
|
|
||||||
for (RoutingNode node : clusterState.getRoutingNodes()) {
|
|
||||||
if (node.node().name().equals(node_1)) {
|
|
||||||
nodeId = node.nodeId();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertNotNull(nodeId);
|
|
||||||
// check that node_1 does not have the shard in local cluster state
|
|
||||||
assertFalse(clusterState.getRoutingNodes().routingNodeIter(nodeId).hasNext());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.info("--> run count from node_4");
|
|
||||||
logLocalClusterStates(node1Client, node2Client, node3Client, node4Client);
|
|
||||||
CountResponse countResponse = node4Client.prepareCount("test").setPreference("local").get();
|
|
||||||
assertThat(countResponse.getCount(), equalTo(1l));
|
|
||||||
logger.info("--> stop disrupting node_1 and node_2");
|
|
||||||
disruptionNode2.stopDisrupting();
|
|
||||||
disruptionNode1.stopDisrupting();
|
|
||||||
// wait for relocation to finish
|
|
||||||
logger.info("--> wait for relocation to finish");
|
|
||||||
clusterHealth = client().admin().cluster().prepareHealth()
|
|
||||||
.setWaitForRelocatingShards(0)
|
|
||||||
.get();
|
|
||||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
|
||||||
} catch (AssertionError e) {
|
|
||||||
for (BlockClusterStateProcessing blockClusterStateProcessing : clusterStateBlocks) {
|
|
||||||
blockClusterStateProcessing.stopDisrupting();
|
|
||||||
}
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This Tracer can be used to signal start of a recovery and shard started event after translog was copied
|
|
||||||
*/
|
|
||||||
public static class StartRecoveryToShardStaredTracer extends MockTransportService.Tracer {
|
|
||||||
private final ESLogger logger;
|
|
||||||
private final CountDownLatch beginRelocationLatch;
|
|
||||||
private final CountDownLatch sentShardStartedLatch;
|
|
||||||
|
|
||||||
public StartRecoveryToShardStaredTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch sentShardStartedLatch) {
|
|
||||||
this.logger = logger;
|
|
||||||
this.beginRelocationLatch = beginRelocationLatch;
|
|
||||||
this.sentShardStartedLatch = sentShardStartedLatch;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
|
|
||||||
if (action.equals(RecoverySource.Actions.START_RECOVERY)) {
|
|
||||||
logger.info("sent: {}, relocation starts", action);
|
|
||||||
beginRelocationLatch.countDown();
|
|
||||||
}
|
|
||||||
if (action.equals(ShardStateAction.SHARD_STARTED_ACTION_NAME)) {
|
|
||||||
logger.info("sent: {}, shard started", action);
|
|
||||||
sentShardStartedLatch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void logLocalClusterStates(Client... clients) {
|
|
||||||
int counter = 1;
|
|
||||||
for (Client client : clients) {
|
|
||||||
ClusterState clusterState = client.admin().cluster().prepareState().setLocal(true).get().getState();
|
|
||||||
logger.info("--> cluster state on node_{} {}", counter, clusterState.prettyPrint());
|
|
||||||
counter++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test creates a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the target
|
* This test creates a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the target
|
||||||
* node but already deleted on the source node. Search request should still work.
|
* node but already deleted on the source node. Search request should still work.
|
||||||
|
|
|
@ -23,9 +23,6 @@ import com.carrotsearch.hppc.IntHashSet;
|
||||||
import com.carrotsearch.hppc.procedures.IntProcedure;
|
import com.carrotsearch.hppc.procedures.IntProcedure;
|
||||||
import org.apache.lucene.index.IndexFileNames;
|
import org.apache.lucene.index.IndexFileNames;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
|
||||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
|
||||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
|
||||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
|
@ -37,20 +34,16 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.discovery.DiscoveryService;
|
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
import org.elasticsearch.index.shard.IndexShardState;
|
import org.elasticsearch.index.shard.IndexShardState;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.indices.IndicesLifecycle;
|
import org.elasticsearch.indices.IndicesLifecycle;
|
||||||
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
|
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
|
||||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
|
||||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
|
@ -58,14 +51,9 @@ import org.elasticsearch.search.SearchHits;
|
||||||
import org.elasticsearch.test.BackgroundIndexer;
|
import org.elasticsearch.test.BackgroundIndexer;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||||
import org.elasticsearch.test.InternalTestCluster;
|
|
||||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||||
import org.elasticsearch.test.transport.MockTransportService;
|
import org.elasticsearch.test.transport.MockTransportService;
|
||||||
import org.elasticsearch.transport.Transport;
|
import org.elasticsearch.transport.*;
|
||||||
import org.elasticsearch.transport.TransportException;
|
|
||||||
import org.elasticsearch.transport.TransportRequest;
|
|
||||||
import org.elasticsearch.transport.TransportRequestOptions;
|
|
||||||
import org.elasticsearch.transport.TransportService;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -85,12 +73,8 @@ import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
||||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||||
import static org.elasticsearch.test.ESIntegTestCase.Scope;
|
import static org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.*;
|
||||||
import static org.hamcrest.Matchers.is;
|
|
||||||
import static org.hamcrest.Matchers.not;
|
|
||||||
import static org.hamcrest.Matchers.startsWith;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -361,89 +345,6 @@ public class RelocationIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/13542")
|
|
||||||
public void testMoveShardsWhileRelocation() throws Exception {
|
|
||||||
final String indexName = "test";
|
|
||||||
|
|
||||||
InternalTestCluster.Async<String> blueFuture = internalCluster().startNodeAsync(Settings.builder().put("node.color", "blue").build());
|
|
||||||
InternalTestCluster.Async<String> redFuture = internalCluster().startNodeAsync(Settings.builder().put("node.color", "red").build());
|
|
||||||
internalCluster().startNode(Settings.builder().put("node.color", "green").build());
|
|
||||||
final String blueNodeName = blueFuture.get();
|
|
||||||
final String redNodeName = redFuture.get();
|
|
||||||
|
|
||||||
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
|
|
||||||
assertThat(response.isTimedOut(), is(false));
|
|
||||||
|
|
||||||
|
|
||||||
client().admin().indices().prepareCreate(indexName)
|
|
||||||
.setSettings(
|
|
||||||
Settings.builder()
|
|
||||||
.put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "blue")
|
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
|
||||||
).get();
|
|
||||||
|
|
||||||
List<IndexRequestBuilder> requests = new ArrayList<>();
|
|
||||||
int numDocs = scaledRandomIntBetween(25, 250);
|
|
||||||
for (int i = 0; i < numDocs; i++) {
|
|
||||||
requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}"));
|
|
||||||
}
|
|
||||||
indexRandom(true, requests);
|
|
||||||
ensureSearchable(indexName);
|
|
||||||
|
|
||||||
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get();
|
|
||||||
String blueNodeId = internalCluster().getInstance(DiscoveryService.class, blueNodeName).localNode().id();
|
|
||||||
|
|
||||||
assertFalse(stateResponse.getState().getRoutingNodes().node(blueNodeId).isEmpty());
|
|
||||||
|
|
||||||
SearchResponse searchResponse = client().prepareSearch(indexName).get();
|
|
||||||
assertHitCount(searchResponse, numDocs);
|
|
||||||
|
|
||||||
// Slow down recovery in order to make recovery cancellations more likely
|
|
||||||
IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(indexName).get();
|
|
||||||
long chunkSize = Math.max(1, statsResponse.getIndex(indexName).getShards()[0].getStats().getStore().size().bytes() / 10);
|
|
||||||
assertTrue(client().admin().cluster().prepareUpdateSettings()
|
|
||||||
.setTransientSettings(Settings.builder()
|
|
||||||
// one chunk per sec..
|
|
||||||
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, chunkSize, ByteSizeUnit.BYTES)
|
|
||||||
.put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, chunkSize, ByteSizeUnit.BYTES)
|
|
||||||
)
|
|
||||||
.get().isAcknowledged());
|
|
||||||
|
|
||||||
client().admin().indices().prepareUpdateSettings(indexName).setSettings(
|
|
||||||
Settings.builder().put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red")
|
|
||||||
).get();
|
|
||||||
|
|
||||||
// Lets wait a bit and then move again to hopefully trigger recovery cancellations.
|
|
||||||
boolean applied = awaitBusy(
|
|
||||||
() -> {
|
|
||||||
RecoveryResponse recoveryResponse =
|
|
||||||
internalCluster().client(redNodeName).admin().indices().prepareRecoveries(indexName).get();
|
|
||||||
return !recoveryResponse.shardRecoveryStates().get(indexName).isEmpty();
|
|
||||||
}
|
|
||||||
);
|
|
||||||
assertTrue(applied);
|
|
||||||
client().admin().indices().prepareUpdateSettings(indexName).setSettings(
|
|
||||||
Settings.builder().put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "green")
|
|
||||||
).get();
|
|
||||||
|
|
||||||
// Restore the recovery speed to not timeout cluster health call
|
|
||||||
assertTrue(client().admin().cluster().prepareUpdateSettings()
|
|
||||||
.setTransientSettings(Settings.builder()
|
|
||||||
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, "20mb")
|
|
||||||
.put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, "512kb")
|
|
||||||
)
|
|
||||||
.get().isAcknowledged());
|
|
||||||
|
|
||||||
// this also waits for all ongoing recoveries to complete:
|
|
||||||
ensureSearchable(indexName);
|
|
||||||
searchResponse = client().prepareSearch(indexName).get();
|
|
||||||
assertHitCount(searchResponse, numDocs);
|
|
||||||
|
|
||||||
stateResponse = client().admin().cluster().prepareState().get();
|
|
||||||
assertTrue(stateResponse.getState().getRoutingNodes().node(blueNodeId).isEmpty());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCancellationCleansTempFiles() throws Exception {
|
public void testCancellationCleansTempFiles() throws Exception {
|
||||||
final String indexName = "test";
|
final String indexName = "test";
|
||||||
|
|
Loading…
Reference in New Issue