Merge branch 'master' into log4j2
* master: Avoid NPE in LoggingListener Randomly use Netty 3 plugin in some tests Skip smoke test client on JDK 9 Revert "Don't allow XContentBuilder#writeValue(TimeValue)" [docs] Remove coming in 2.0.0 Don't allow XContentBuilder#writeValue(TimeValue) [doc] Remove leftover from CONSOLE conversion Parameter improvements to Cluster Health API wait for shards (#20223) Add 2.4.0 to packaging tests list Docs: clarify scale is applied at origin+offest (#20242)
This commit is contained in:
commit
76ab02e002
|
@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.cluster.health;
|
|||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
|
@ -41,8 +42,8 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
|
|||
private String[] indices;
|
||||
private TimeValue timeout = new TimeValue(30, TimeUnit.SECONDS);
|
||||
private ClusterHealthStatus waitForStatus;
|
||||
private int waitForRelocatingShards = -1;
|
||||
private int waitForActiveShards = -1;
|
||||
private boolean waitForNoRelocatingShards = false;
|
||||
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
|
||||
private String waitForNodes = "";
|
||||
private Priority waitForEvents = null;
|
||||
|
||||
|
@ -102,24 +103,52 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
|
|||
return waitForStatus(ClusterHealthStatus.YELLOW);
|
||||
}
|
||||
|
||||
public int waitForRelocatingShards() {
|
||||
return waitForRelocatingShards;
|
||||
public boolean waitForNoRelocatingShards() {
|
||||
return waitForNoRelocatingShards;
|
||||
}
|
||||
|
||||
public ClusterHealthRequest waitForRelocatingShards(int waitForRelocatingShards) {
|
||||
this.waitForRelocatingShards = waitForRelocatingShards;
|
||||
/**
|
||||
* Sets whether the request should wait for there to be no relocating shards before
|
||||
* retrieving the cluster health status. Defaults to {@code false}, meaning the
|
||||
* operation does not wait on there being no more relocating shards. Set to <code>true</code>
|
||||
* to wait until the number of relocating shards in the cluster is 0.
|
||||
*/
|
||||
public ClusterHealthRequest waitForNoRelocatingShards(boolean waitForNoRelocatingShards) {
|
||||
this.waitForNoRelocatingShards = waitForNoRelocatingShards;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int waitForActiveShards() {
|
||||
public ActiveShardCount waitForActiveShards() {
|
||||
return waitForActiveShards;
|
||||
}
|
||||
|
||||
public ClusterHealthRequest waitForActiveShards(int waitForActiveShards) {
|
||||
this.waitForActiveShards = waitForActiveShards;
|
||||
/**
|
||||
* Sets the number of shard copies that must be active across all indices before getting the
|
||||
* health status. Defaults to {@link ActiveShardCount#NONE}, meaning we don't wait on any active shards.
|
||||
* Set this value to {@link ActiveShardCount#ALL} to wait for all shards (primary and
|
||||
* all replicas) to be active across all indices in the cluster. Otherwise, use
|
||||
* {@link ActiveShardCount#from(int)} to set this value to any non-negative integer, up to the
|
||||
* total number of shard copies to wait for.
|
||||
*/
|
||||
public ClusterHealthRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
|
||||
if (waitForActiveShards.equals(ActiveShardCount.DEFAULT)) {
|
||||
// the default for cluster health request is 0, not 1
|
||||
this.waitForActiveShards = ActiveShardCount.NONE;
|
||||
} else {
|
||||
this.waitForActiveShards = waitForActiveShards;
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
|
||||
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
|
||||
* to get the ActiveShardCount.
|
||||
*/
|
||||
public ClusterHealthRequest waitForActiveShards(final int waitForActiveShards) {
|
||||
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
|
||||
}
|
||||
|
||||
public String waitForNodes() {
|
||||
return waitForNodes;
|
||||
}
|
||||
|
@ -162,8 +191,8 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
|
|||
if (in.readBoolean()) {
|
||||
waitForStatus = ClusterHealthStatus.fromValue(in.readByte());
|
||||
}
|
||||
waitForRelocatingShards = in.readInt();
|
||||
waitForActiveShards = in.readInt();
|
||||
waitForNoRelocatingShards = in.readBoolean();
|
||||
waitForActiveShards = ActiveShardCount.readFrom(in);
|
||||
waitForNodes = in.readString();
|
||||
if (in.readBoolean()) {
|
||||
waitForEvents = Priority.readFrom(in);
|
||||
|
@ -188,8 +217,8 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
|
|||
out.writeBoolean(true);
|
||||
out.writeByte(waitForStatus.value());
|
||||
}
|
||||
out.writeInt(waitForRelocatingShards);
|
||||
out.writeInt(waitForActiveShards);
|
||||
out.writeBoolean(waitForNoRelocatingShards);
|
||||
waitForActiveShards.writeTo(out);
|
||||
out.writeString(waitForNodes);
|
||||
if (waitForEvents == null) {
|
||||
out.writeBoolean(false);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.cluster.health;
|
||||
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
|
@ -64,11 +65,40 @@ public class ClusterHealthRequestBuilder extends MasterNodeReadOperationRequestB
|
|||
return this;
|
||||
}
|
||||
|
||||
public ClusterHealthRequestBuilder setWaitForRelocatingShards(int waitForRelocatingShards) {
|
||||
request.waitForRelocatingShards(waitForRelocatingShards);
|
||||
/**
|
||||
* Sets whether the request should wait for there to be no relocating shards before
|
||||
* retrieving the cluster health status. Defaults to <code>false</code>, meaning the
|
||||
* operation does not wait on there being no more relocating shards. Set to <code>true</code>
|
||||
* to wait until the number of relocating shards in the cluster is 0.
|
||||
*/
|
||||
public ClusterHealthRequestBuilder setWaitForNoRelocatingShards(boolean waitForRelocatingShards) {
|
||||
request.waitForNoRelocatingShards(waitForRelocatingShards);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the number of shard copies that must be active before getting the health status.
|
||||
* Defaults to {@link ActiveShardCount#NONE}, meaning we don't wait on any active shards.
|
||||
* Set this value to {@link ActiveShardCount#ALL} to wait for all shards (primary and
|
||||
* all replicas) to be active across all indices in the cluster. Otherwise, use
|
||||
* {@link ActiveShardCount#from(int)} to set this value to any non-negative integer, up to the
|
||||
* total number of shard copies that would exist across all indices in the cluster.
|
||||
*/
|
||||
public ClusterHealthRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
|
||||
if (waitForActiveShards.equals(ActiveShardCount.DEFAULT)) {
|
||||
// the default for cluster health is 0, not 1
|
||||
request.waitForActiveShards(ActiveShardCount.NONE);
|
||||
} else {
|
||||
request.waitForActiveShards(waitForActiveShards);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
|
||||
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
|
||||
* to get the ActiveShardCount.
|
||||
*/
|
||||
public ClusterHealthRequestBuilder setWaitForActiveShards(int waitForActiveShards) {
|
||||
request.waitForActiveShards(waitForActiveShards);
|
||||
return this;
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -127,10 +128,10 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
|||
if (request.waitForStatus() == null) {
|
||||
waitFor--;
|
||||
}
|
||||
if (request.waitForRelocatingShards() == -1) {
|
||||
if (request.waitForNoRelocatingShards() == false) {
|
||||
waitFor--;
|
||||
}
|
||||
if (request.waitForActiveShards() == -1) {
|
||||
if (request.waitForActiveShards().equals(ActiveShardCount.NONE)) {
|
||||
waitFor--;
|
||||
}
|
||||
if (request.waitForNodes().isEmpty()) {
|
||||
|
@ -205,11 +206,22 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
|||
if (request.waitForStatus() != null && response.getStatus().value() <= request.waitForStatus().value()) {
|
||||
waitForCounter++;
|
||||
}
|
||||
if (request.waitForRelocatingShards() != -1 && response.getRelocatingShards() <= request.waitForRelocatingShards()) {
|
||||
if (request.waitForNoRelocatingShards() && response.getRelocatingShards() == 0) {
|
||||
waitForCounter++;
|
||||
}
|
||||
if (request.waitForActiveShards() != -1 && response.getActiveShards() >= request.waitForActiveShards()) {
|
||||
waitForCounter++;
|
||||
if (request.waitForActiveShards().equals(ActiveShardCount.NONE) == false) {
|
||||
ActiveShardCount waitForActiveShards = request.waitForActiveShards();
|
||||
assert waitForActiveShards.equals(ActiveShardCount.DEFAULT) == false :
|
||||
"waitForActiveShards must not be DEFAULT on the request object, instead it should be NONE";
|
||||
if (waitForActiveShards.equals(ActiveShardCount.ALL)
|
||||
&& response.getUnassignedShards() == 0
|
||||
&& response.getInitializingShards() == 0) {
|
||||
// if we are waiting for all shards to be active, then the num of unassigned and num of initializing shards must be 0
|
||||
waitForCounter++;
|
||||
} else if (waitForActiveShards.enoughShardsActive(response.getActiveShards())) {
|
||||
// there are enough active shards to meet the requirements of the request
|
||||
waitForCounter++;
|
||||
}
|
||||
}
|
||||
if (request.indices() != null && request.indices().length > 0) {
|
||||
try {
|
||||
|
|
|
@ -120,9 +120,25 @@ public final class ActiveShardCount implements Writeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true iff the given number of active shards is enough to meet
|
||||
* the required shard count represented by this instance. This method
|
||||
* should only be invoked with {@link ActiveShardCount} objects created
|
||||
* from {@link #from(int)}, or {@link #NONE} or {@link #ONE}.
|
||||
*/
|
||||
public boolean enoughShardsActive(final int activeShardCount) {
|
||||
if (this.value < 0) {
|
||||
throw new IllegalStateException("not enough information to resolve to shard count");
|
||||
}
|
||||
if (activeShardCount < 0) {
|
||||
throw new IllegalArgumentException("activeShardCount cannot be negative");
|
||||
}
|
||||
return this.value <= activeShardCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true iff the given cluster state's routing table contains enough active
|
||||
* shards to meet the required shard count represented by this instance.
|
||||
* shards for the given index to meet the required shard count represented by this instance.
|
||||
*/
|
||||
public boolean enoughShardsActive(final ClusterState clusterState, final String indexName) {
|
||||
if (this == ActiveShardCount.NONE) {
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.rest.action.admin.cluster;
|
|||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.Priority;
|
||||
|
@ -57,9 +58,17 @@ public class RestClusterHealthAction extends BaseRestHandler {
|
|||
if (waitForStatus != null) {
|
||||
clusterHealthRequest.waitForStatus(ClusterHealthStatus.valueOf(waitForStatus.toUpperCase(Locale.ROOT)));
|
||||
}
|
||||
clusterHealthRequest.waitForRelocatingShards(
|
||||
request.paramAsInt("wait_for_relocating_shards", clusterHealthRequest.waitForRelocatingShards()));
|
||||
clusterHealthRequest.waitForActiveShards(request.paramAsInt("wait_for_active_shards", clusterHealthRequest.waitForActiveShards()));
|
||||
clusterHealthRequest.waitForNoRelocatingShards(
|
||||
request.paramAsBoolean("wait_for_no_relocating_shards", clusterHealthRequest.waitForNoRelocatingShards()));
|
||||
if (request.hasParam("wait_for_relocating_shards")) {
|
||||
// wait_for_relocating_shards has been removed in favor of wait_for_no_relocating_shards
|
||||
throw new IllegalArgumentException("wait_for_relocating_shards has been removed, " +
|
||||
"use wait_for_no_relocating_shards [true/false] instead");
|
||||
}
|
||||
String waitForActiveShards = request.param("wait_for_active_shards");
|
||||
if (waitForActiveShards != null) {
|
||||
clusterHealthRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
|
||||
}
|
||||
clusterHealthRequest.waitForNodes(request.param("wait_for_nodes", clusterHealthRequest.waitForNodes()));
|
||||
if (request.param("wait_for_events") != null) {
|
||||
clusterHealthRequest.waitForEvents(Priority.valueOf(request.param("wait_for_events").toUpperCase(Locale.ROOT)));
|
||||
|
|
|
@ -146,6 +146,25 @@ public class ActiveShardCountTests extends ESTestCase {
|
|||
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
|
||||
}
|
||||
|
||||
public void testEnoughShardsActiveValueBased() {
|
||||
// enough shards active case
|
||||
int threshold = randomIntBetween(1, 50);
|
||||
ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(0, threshold));
|
||||
assertTrue(waitForActiveShards.enoughShardsActive(randomIntBetween(threshold, 50)));
|
||||
// not enough shards active
|
||||
waitForActiveShards = ActiveShardCount.from(randomIntBetween(threshold, 50));
|
||||
assertFalse(waitForActiveShards.enoughShardsActive(randomIntBetween(0, threshold - 1)));
|
||||
// wait for zero shards should always pass
|
||||
assertTrue(ActiveShardCount.from(0).enoughShardsActive(randomIntBetween(0, 50)));
|
||||
// invalid values
|
||||
Exception e = expectThrows(IllegalStateException.class, () -> ActiveShardCount.ALL.enoughShardsActive(randomIntBetween(0, 50)));
|
||||
assertEquals("not enough information to resolve to shard count", e.getMessage());
|
||||
e = expectThrows(IllegalStateException.class, () -> ActiveShardCount.DEFAULT.enoughShardsActive(randomIntBetween(0, 50)));
|
||||
assertEquals("not enough information to resolve to shard count", e.getMessage());
|
||||
e = expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.NONE.enoughShardsActive(randomIntBetween(-10, -1)));
|
||||
assertEquals("activeShardCount cannot be negative", e.getMessage());
|
||||
}
|
||||
|
||||
private void runTestForOneActiveShard(final ActiveShardCount activeShardCount) {
|
||||
final String indexName = "test-idx";
|
||||
final int numberOfShards = randomIntBetween(1, 5);
|
||||
|
|
|
@ -79,7 +79,7 @@ public class AwarenessAllocationIT extends ESIntegTestCase {
|
|||
assertThat(awaitBusy(
|
||||
() -> {
|
||||
logger.info("--> waiting for no relocation");
|
||||
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("3").setWaitForRelocatingShards(0).get();
|
||||
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("3").setWaitForNoRelocatingShards(true).get();
|
||||
if (clusterHealth.isTimedOut()) {
|
||||
return false;
|
||||
}
|
||||
|
@ -131,7 +131,7 @@ public class AwarenessAllocationIT extends ESIntegTestCase {
|
|||
.put("index.number_of_replicas", 1)).execute().actionGet();
|
||||
|
||||
logger.info("--> waiting for shards to be allocated");
|
||||
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForRelocatingShards(0).execute().actionGet();
|
||||
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).execute().actionGet();
|
||||
assertThat(health.isTimedOut(), equalTo(false));
|
||||
|
||||
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
|
||||
|
@ -166,7 +166,7 @@ public class AwarenessAllocationIT extends ESIntegTestCase {
|
|||
client().admin().indices().prepareCreate("test")
|
||||
.setSettings(Settings.builder().put("index.number_of_shards", 5)
|
||||
.put("index.number_of_replicas", 1)).execute().actionGet();
|
||||
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").setWaitForRelocatingShards(0).execute().actionGet();
|
||||
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").setWaitForNoRelocatingShards(true).execute().actionGet();
|
||||
assertThat(health.isTimedOut(), equalTo(false));
|
||||
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
|
||||
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();
|
||||
|
@ -186,7 +186,7 @@ public class AwarenessAllocationIT extends ESIntegTestCase {
|
|||
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("3").execute().actionGet();
|
||||
assertThat(health.isTimedOut(), equalTo(false));
|
||||
client().admin().cluster().prepareReroute().get();
|
||||
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("3").setWaitForActiveShards(10).setWaitForRelocatingShards(0).execute().actionGet();
|
||||
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("3").setWaitForActiveShards(10).setWaitForNoRelocatingShards(true).execute().actionGet();
|
||||
|
||||
assertThat(health.isTimedOut(), equalTo(false));
|
||||
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
|
||||
|
@ -208,7 +208,7 @@ public class AwarenessAllocationIT extends ESIntegTestCase {
|
|||
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("4").execute().actionGet();
|
||||
assertThat(health.isTimedOut(), equalTo(false));
|
||||
client().admin().cluster().prepareReroute().get();
|
||||
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("4").setWaitForActiveShards(10).setWaitForRelocatingShards(0).execute().actionGet();
|
||||
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("4").setWaitForActiveShards(10).setWaitForNoRelocatingShards(true).execute().actionGet();
|
||||
|
||||
assertThat(health.isTimedOut(), equalTo(false));
|
||||
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
|
||||
|
@ -229,7 +229,7 @@ public class AwarenessAllocationIT extends ESIntegTestCase {
|
|||
assertThat(counts.containsKey(noZoneNode), equalTo(false));
|
||||
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("cluster.routing.allocation.awareness.attributes", "").build()).get();
|
||||
|
||||
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("4").setWaitForActiveShards(10).setWaitForRelocatingShards(0).execute().actionGet();
|
||||
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("4").setWaitForActiveShards(10).setWaitForNoRelocatingShards(true).execute().actionGet();
|
||||
|
||||
assertThat(health.isTimedOut(), equalTo(false));
|
||||
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
|
||||
|
|
|
@ -138,7 +138,7 @@ public class ClusterRerouteIT extends ESIntegTestCase {
|
|||
assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_2).getId()).iterator().next().state(), equalTo(ShardRoutingState.INITIALIZING));
|
||||
|
||||
|
||||
healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().setWaitForRelocatingShards(0).execute().actionGet();
|
||||
healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().setWaitForNoRelocatingShards(true).execute().actionGet();
|
||||
assertThat(healthResponse.isTimedOut(), equalTo(false));
|
||||
|
||||
logger.info("--> get the state, verify shard 1 primary moved from node1 to node2");
|
||||
|
@ -335,7 +335,7 @@ public class ClusterRerouteIT extends ESIntegTestCase {
|
|||
assertAcked(client().admin().cluster().prepareReroute()
|
||||
.add(new MoveAllocationCommand("test-blocks", 0, nodesIds.get(toggle % 2), nodesIds.get(++toggle % 2))));
|
||||
|
||||
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForYellowStatus().setWaitForRelocatingShards(0).execute().actionGet();
|
||||
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForYellowStatus().setWaitForNoRelocatingShards(true).execute().actionGet();
|
||||
assertThat(healthResponse.isTimedOut(), equalTo(false));
|
||||
} finally {
|
||||
disableIndexBlock("test-blocks", blockSetting);
|
||||
|
|
|
@ -120,7 +120,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
|
|||
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth()
|
||||
.setWaitForEvents(Priority.LANGUID)
|
||||
.setWaitForNodes("4")
|
||||
.setWaitForRelocatingShards(0)
|
||||
.setWaitForNoRelocatingShards(true)
|
||||
.get();
|
||||
assertThat(clusterHealthResponse.isTimedOut(), is(false));
|
||||
|
||||
|
|
|
@ -136,7 +136,7 @@ public class ShardInfoIT extends ESIntegTestCase {
|
|||
assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount));
|
||||
|
||||
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx")
|
||||
.setWaitForRelocatingShards(0)
|
||||
.setWaitForNoRelocatingShards(true)
|
||||
.get();
|
||||
assertThat(healthResponse.isTimedOut(), equalTo(false));
|
||||
|
||||
|
|
|
@ -408,7 +408,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
|
|||
.health(Requests.clusterHealthRequest()
|
||||
.waitForGreenStatus()
|
||||
.waitForEvents(Priority.LANGUID)
|
||||
.waitForRelocatingShards(0).waitForNodes("2")).actionGet();
|
||||
.waitForNoRelocatingShards(true).waitForNodes("2")).actionGet();
|
||||
}
|
||||
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||
IndexMetaData metaData = state.getMetaData().index("test");
|
||||
|
@ -470,7 +470,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
|
|||
.health(Requests.clusterHealthRequest()
|
||||
.waitForGreenStatus()
|
||||
.waitForEvents(Priority.LANGUID)
|
||||
.waitForRelocatingShards(0).waitForNodes("2")).actionGet();
|
||||
.waitForNoRelocatingShards(true).waitForNodes("2")).actionGet();
|
||||
}
|
||||
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||
IndexMetaData metaData = state.getMetaData().index("test");
|
||||
|
@ -507,7 +507,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
|
|||
.health(Requests.clusterHealthRequest()
|
||||
.waitForGreenStatus()
|
||||
.waitForEvents(Priority.LANGUID)
|
||||
.waitForRelocatingShards(0).waitForNodes("2")).actionGet();
|
||||
.waitForNoRelocatingShards(true).waitForNodes("2")).actionGet();
|
||||
}
|
||||
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||
MetaData metaData = state.getMetaData();
|
||||
|
|
|
@ -73,7 +73,7 @@ public class MetaDataWriteDataNodesIT extends ESIntegTestCase {
|
|||
|
||||
logger.debug("relocating index...");
|
||||
client().admin().indices().prepareUpdateSettings(index).setSettings(Settings.builder().put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node2)).get();
|
||||
client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).get();
|
||||
client().admin().cluster().prepareHealth().setWaitForNoRelocatingShards(true).get();
|
||||
ensureGreen();
|
||||
assertIndexDirectoryDeleted(node1, resolveIndex);
|
||||
assertIndexInMetaState(node2, index);
|
||||
|
|
|
@ -176,7 +176,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|||
ClusterHealthResponse health = client().admin().cluster()
|
||||
.health(Requests.clusterHealthRequest("test").waitForGreenStatus()
|
||||
.timeout("5m") // sometimes due to cluster rebalacing and random settings default timeout is just not enough.
|
||||
.waitForRelocatingShards(0)).actionGet();
|
||||
.waitForNoRelocatingShards(true)).actionGet();
|
||||
if (health.isTimedOut()) {
|
||||
logger.info("cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
|
||||
assertThat("timed out waiting for green state", health.isTimedOut(), equalTo(false));
|
||||
|
|
|
@ -93,7 +93,7 @@ public class IndexLifecycleActionIT extends ESIntegTestCase {
|
|||
// explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join)
|
||||
client().admin().cluster().prepareReroute().execute().actionGet();
|
||||
|
||||
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForRelocatingShards(0)).actionGet();
|
||||
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForNoRelocatingShards(true)).actionGet();
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
|
||||
assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(2));
|
||||
|
@ -130,7 +130,7 @@ public class IndexLifecycleActionIT extends ESIntegTestCase {
|
|||
// explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join)
|
||||
client().admin().cluster().prepareReroute().execute().actionGet();
|
||||
|
||||
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3").waitForRelocatingShards(0)).actionGet();
|
||||
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3").waitForNoRelocatingShards(true)).actionGet();
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
|
||||
assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(3));
|
||||
|
@ -171,7 +171,7 @@ public class IndexLifecycleActionIT extends ESIntegTestCase {
|
|||
|
||||
client().admin().cluster().prepareReroute().get();
|
||||
|
||||
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet();
|
||||
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNoRelocatingShards(true).waitForNodes("2")).actionGet();
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
|
||||
assertThat(clusterHealth.getRelocatingShards(), equalTo(0));
|
||||
|
|
|
@ -128,7 +128,7 @@ public class FlushIT extends ESIntegTestCase {
|
|||
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, currentNodeName, newNodeName)).get();
|
||||
|
||||
client().admin().cluster().prepareHealth()
|
||||
.setWaitForRelocatingShards(0)
|
||||
.setWaitForNoRelocatingShards(true)
|
||||
.get();
|
||||
indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
|
||||
for (ShardStats shardStats : indexStats.getShards()) {
|
||||
|
|
|
@ -76,7 +76,7 @@ public class IndexPrimaryRelocationIT extends ESIntegTestCase {
|
|||
client().admin().cluster().prepareReroute()
|
||||
.add(new MoveAllocationCommand("test", 0, relocationSource.getId(), relocationTarget.getId()))
|
||||
.execute().actionGet();
|
||||
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).execute().actionGet();
|
||||
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNoRelocatingShards(true).execute().actionGet();
|
||||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
||||
logger.info("--> [iteration {}] relocation complete", i);
|
||||
relocationSource = relocationTarget;
|
||||
|
|
|
@ -86,7 +86,7 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase {
|
|||
allowNodes("test", 3);
|
||||
|
||||
logger.info("Running Cluster Health");
|
||||
clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes(">=3").execute().actionGet();
|
||||
clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes(">=3").execute().actionGet();
|
||||
logger.info("Done Cluster Health, status {}", clusterHealth.getStatus());
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
|
||||
|
@ -104,7 +104,7 @@ public class UpdateNumberOfReplicasIT extends ESIntegTestCase {
|
|||
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.number_of_replicas", 0)).get());
|
||||
|
||||
logger.info("Running Cluster Health");
|
||||
clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes(">=3").execute().actionGet();
|
||||
clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes(">=3").execute().actionGet();
|
||||
logger.info("Done Cluster Health, status {}", clusterHealth.getStatus());
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
|
||||
|
|
|
@ -125,7 +125,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
|
|||
logger.info("--> running cluster_health");
|
||||
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
|
||||
.setWaitForNodes("4")
|
||||
.setWaitForRelocatingShards(0)
|
||||
.setWaitForNoRelocatingShards(true)
|
||||
.get();
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
|
||||
|
@ -158,7 +158,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
|
|||
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node_1, node_3)).get();
|
||||
}
|
||||
clusterHealth = client().admin().cluster().prepareHealth()
|
||||
.setWaitForRelocatingShards(0)
|
||||
.setWaitForNoRelocatingShards(true)
|
||||
.get();
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
|
||||
|
@ -215,7 +215,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
|
|||
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node_1, node_2)).get();
|
||||
shardActiveRequestSent.await();
|
||||
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
|
||||
.setWaitForRelocatingShards(0)
|
||||
.setWaitForNoRelocatingShards(true)
|
||||
.get();
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
logClusterState();
|
||||
|
@ -255,7 +255,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
|
|||
logger.info("--> running cluster_health");
|
||||
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
|
||||
.setWaitForNodes("3")
|
||||
.setWaitForRelocatingShards(0)
|
||||
.setWaitForNoRelocatingShards(true)
|
||||
.get();
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
|
||||
|
@ -270,7 +270,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
|
|||
clusterHealth = client().admin().cluster().prepareHealth()
|
||||
.setWaitForGreenStatus()
|
||||
.setWaitForNodes("2")
|
||||
.setWaitForRelocatingShards(0)
|
||||
.setWaitForNoRelocatingShards(true)
|
||||
.get();
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
logger.info("--> done cluster_health, status {}", clusterHealth.getStatus());
|
||||
|
@ -313,7 +313,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
|
|||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + "_name", node4)
|
||||
));
|
||||
assertFalse(client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).setWaitForGreenStatus().setWaitForNodes("5").get().isTimedOut());
|
||||
assertFalse(client().admin().cluster().prepareHealth().setWaitForNoRelocatingShards(true).setWaitForGreenStatus().setWaitForNodes("5").get().isTimedOut());
|
||||
|
||||
// disable allocation to control the situation more easily
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||
|
|
|
@ -78,7 +78,7 @@ public class FullRollingRestartIT extends ESIntegTestCase {
|
|||
internalCluster().startNodesAsync(2, settings).get();
|
||||
|
||||
// make sure the cluster state is green, and all has been recovered
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("3"));
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("3"));
|
||||
|
||||
logger.info("--> add two more nodes");
|
||||
internalCluster().startNodesAsync(2, settings).get();
|
||||
|
@ -87,7 +87,7 @@ public class FullRollingRestartIT extends ESIntegTestCase {
|
|||
setMinimumMasterNodes(3);
|
||||
|
||||
// make sure the cluster state is green, and all has been recovered
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("5"));
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("5"));
|
||||
|
||||
logger.info("--> refreshing and checking data");
|
||||
refresh();
|
||||
|
@ -98,14 +98,14 @@ public class FullRollingRestartIT extends ESIntegTestCase {
|
|||
// now start shutting nodes down
|
||||
internalCluster().stopRandomDataNode();
|
||||
// make sure the cluster state is green, and all has been recovered
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("4"));
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("4"));
|
||||
|
||||
// going down to 3 nodes. note that the min_master_node may not be in effect when we shutdown the 4th
|
||||
// node, but that's OK as it is set to 3 before.
|
||||
setMinimumMasterNodes(2);
|
||||
internalCluster().stopRandomDataNode();
|
||||
// make sure the cluster state is green, and all has been recovered
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("3"));
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("3"));
|
||||
|
||||
logger.info("--> stopped two nodes, verifying data");
|
||||
refresh();
|
||||
|
@ -116,14 +116,14 @@ public class FullRollingRestartIT extends ESIntegTestCase {
|
|||
// closing the 3rd node
|
||||
internalCluster().stopRandomDataNode();
|
||||
// make sure the cluster state is green, and all has been recovered
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("2"));
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("2"));
|
||||
|
||||
// closing the 2nd node
|
||||
setMinimumMasterNodes(1);
|
||||
internalCluster().stopRandomDataNode();
|
||||
|
||||
// make sure the cluster state is yellow, and all has been recovered
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForYellowStatus().setWaitForRelocatingShards(0).setWaitForNodes("1"));
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForYellowStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("1"));
|
||||
|
||||
logger.info("--> one node left, verifying data");
|
||||
refresh();
|
||||
|
|
|
@ -193,7 +193,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|||
allowNodes("test", 4);
|
||||
|
||||
logger.info("--> waiting for GREEN health status ...");
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForRelocatingShards(0));
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNoRelocatingShards(true));
|
||||
|
||||
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
|
||||
waitForDocs(totalNumDocs, indexer);
|
||||
|
@ -204,23 +204,23 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|||
logger.info("--> allow 3 nodes for index [test] ...");
|
||||
allowNodes("test", 3);
|
||||
logger.info("--> waiting for relocations ...");
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0));
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForNoRelocatingShards(true));
|
||||
|
||||
logger.info("--> allow 2 nodes for index [test] ...");
|
||||
allowNodes("test", 2);
|
||||
logger.info("--> waiting for relocations ...");
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0));
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForNoRelocatingShards(true));
|
||||
|
||||
logger.info("--> allow 1 nodes for index [test] ...");
|
||||
allowNodes("test", 1);
|
||||
logger.info("--> waiting for relocations ...");
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0));
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForNoRelocatingShards(true));
|
||||
|
||||
logger.info("--> marking and waiting for indexing threads to stop ...");
|
||||
indexer.stop();
|
||||
logger.info("--> indexing threads stopped");
|
||||
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0));
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForNoRelocatingShards(true));
|
||||
|
||||
logger.info("--> refreshing the index");
|
||||
refreshAndAssert();
|
||||
|
|
|
@ -136,9 +136,9 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
.add(new MoveAllocationCommand("test", 0, node_1, node_2))
|
||||
.execute().actionGet();
|
||||
|
||||
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
||||
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
||||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
||||
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
||||
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
||||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
||||
|
||||
logger.info("--> verifying count again...");
|
||||
|
@ -199,9 +199,9 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
logger.debug("--> flushing");
|
||||
client().admin().indices().prepareFlush().get();
|
||||
}
|
||||
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
||||
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
||||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
||||
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
||||
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
||||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
||||
indexer.pauseIndexing();
|
||||
logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode);
|
||||
|
@ -332,7 +332,7 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
indexRandom(true, true, builders2);
|
||||
|
||||
// verify cluster was finished.
|
||||
assertFalse(client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).setWaitForEvents(Priority.LANGUID).setTimeout("30s").get().isTimedOut());
|
||||
assertFalse(client().admin().cluster().prepareHealth().setWaitForNoRelocatingShards(true).setWaitForEvents(Priority.LANGUID).setTimeout("30s").get().isTimedOut());
|
||||
logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode);
|
||||
|
||||
logger.debug("--> verifying all searches return the same number of docs");
|
||||
|
|
|
@ -113,7 +113,7 @@ public class SearchWhileRelocatingIT extends ESIntegTestCase {
|
|||
threads[j].join();
|
||||
}
|
||||
// this might time out on some machines if they are really busy and you hit lots of throttling
|
||||
ClusterHealthResponse resp = client().admin().cluster().prepareHealth().setWaitForYellowStatus().setWaitForRelocatingShards(0).setWaitForEvents(Priority.LANGUID).setTimeout("5m").get();
|
||||
ClusterHealthResponse resp = client().admin().cluster().prepareHealth().setWaitForYellowStatus().setWaitForNoRelocatingShards(true).setWaitForEvents(Priority.LANGUID).setTimeout("5m").get();
|
||||
assertNoTimeout(resp);
|
||||
// if we hit only non-critical exceptions we make sure that the post search works
|
||||
if (!nonCriticalExceptions.isEmpty()) {
|
||||
|
|
|
@ -86,7 +86,7 @@ public class TransportSearchFailuresIT extends ESIntegTestCase {
|
|||
ClusterHealthResponse clusterHealth = client()
|
||||
.admin()
|
||||
.cluster()
|
||||
.health(clusterHealthRequest("test").waitForYellowStatus().waitForRelocatingShards(0)
|
||||
.health(clusterHealthRequest("test").waitForYellowStatus().waitForNoRelocatingShards(true)
|
||||
.waitForActiveShards(test.totalNumShards)).actionGet();
|
||||
logger.info("Done Cluster Health, status {}", clusterHealth.getStatus());
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
|
|
|
@ -32,14 +32,12 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.discovery.MasterNotDiscoveredException;
|
||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
|
@ -408,7 +406,7 @@ public class TribeIT extends ESIntegTestCase {
|
|||
|
||||
private void ensureGreen(TestCluster testCluster) {
|
||||
ClusterHealthResponse actionGet = testCluster.client().admin().cluster()
|
||||
.health(Requests.clusterHealthRequest().waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
|
||||
.health(Requests.clusterHealthRequest().waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForNoRelocatingShards(true)).actionGet();
|
||||
if (actionGet.isTimedOut()) {
|
||||
logger.info("ensureGreen timed out, cluster state:\n{}\n{}", testCluster.client().admin().cluster().prepareState().get().getState().prettyPrint(), testCluster.client().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
|
||||
assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false));
|
||||
|
|
|
@ -82,14 +82,14 @@ The cluster health API accepts the following request parameters:
|
|||
provided or better, i.e. `green` > `yellow` > `red`. By default, will not
|
||||
wait for any status.
|
||||
|
||||
`wait_for_relocating_shards`::
|
||||
A number controlling to how many relocating
|
||||
shards to wait for. Usually will be `0` to indicate to wait till all
|
||||
relocations have happened. Defaults to not wait.
|
||||
`wait_for_no_relocating_shards`::
|
||||
A boolean value which controls whether to wait (until the timeout provided)
|
||||
for the cluster to have no shard relocations. Defaults to false, which means
|
||||
it will not wait for relocating shards.
|
||||
|
||||
`wait_for_active_shards`::
|
||||
A number controlling to how many active
|
||||
shards to wait for. Defaults to not wait.
|
||||
A number controlling to how many active shards to wait for, `all` to wait
|
||||
for all shards in the cluster to be active, or `0` to not wait. Defaults to `0`.
|
||||
|
||||
`wait_for_nodes`::
|
||||
The request waits until the specified number `N` of
|
||||
|
|
|
@ -136,8 +136,6 @@ GET _analyze
|
|||
// CONSOLE
|
||||
<1> Set "keyword" to output "keyword" attribute only
|
||||
|
||||
coming[2.0.0, body based parameters were added in 2.0.0]
|
||||
|
||||
The request returns the following result:
|
||||
|
||||
[source,js]
|
||||
|
|
|
@ -50,7 +50,7 @@ PUT /my_source_index/_settings
|
|||
It can take a while to relocate the source index. Progress can be tracked
|
||||
with the <<cat-recovery,`_cat recovery` API>>, or the <<cluster-health,
|
||||
`cluster health` API>> can be used to wait until all shards have relocated
|
||||
with the `wait_for_relocating_shards` parameter.
|
||||
with the `wait_for_no_relocating_shards` parameter.
|
||||
|
||||
[float]
|
||||
=== Shrinking an index
|
||||
|
|
|
@ -379,6 +379,18 @@ in favor of using `addTokenFilter(String)`/`addTokenFilter(Map)` and `addCharFil
|
|||
The `setTokenFilters(String...)` and `setCharFilters(String...)` methods have been removed
|
||||
in favor of using `addTokenFilter(String)`/`addTokenFilter(Map)` and `addCharFilter(String)`/`addCharFilter(Map)` each filters
|
||||
|
||||
==== ClusterHealthRequest
|
||||
|
||||
The `waitForRelocatingShards(int)` method has been removed in favor of `waitForNoRelocatingShards(boolean)`
|
||||
which instead uses a boolean flag to denote whether the cluster health operation should wait for there to
|
||||
be no relocating shards in the cluster before returning.
|
||||
|
||||
==== ClusterHealthRequestBuilder
|
||||
|
||||
The `setWaitForRelocatingShards(int)` method has been removed in favor of `setWaitForNoRelocatingShards(boolean)`
|
||||
which instead uses a boolean flag to denote whether the cluster health operation should wait for there to
|
||||
be no relocating shards in the cluster before returning.
|
||||
|
||||
==== BlobContainer Interface for Snapshot/Restore
|
||||
|
||||
Some methods have been removed from the `BlobContainer` interface for Snapshot/Restore repositories. In particular,
|
||||
|
|
|
@ -91,3 +91,10 @@ The `PUT /_search/template/{id}/_create` endpoint that previously allowed to cre
|
|||
|
||||
Some REST endpoints (e.g., cluster update index settings) supported detecting content in the Java
|
||||
properties format (line-delimited key=value pairs). This support has been removed.
|
||||
|
||||
==== `wait_for_relocating_shards` is now `wait_for_no_relocating_shards` in `/_cluster/health`
|
||||
|
||||
The `wait_for_relocating_shards` parameter that used to take a number is now simply a boolean
|
||||
flag `wait_for_no_relocating_shards`, which if set to true, means the request will wait (up
|
||||
until the configured timeout) for the cluster to have no shard relocations before returning.
|
||||
Defaults to false, which means the operation will not wait.
|
||||
|
|
|
@ -318,7 +318,7 @@ In the above example, the field is a <<geo-point,`geo_point`>> and origin can be
|
|||
math (for example `now-1h`) is supported for origin.
|
||||
|
||||
`scale`::
|
||||
Required for all types. Defines the distance from origin at which the computed
|
||||
Required for all types. Defines the distance from origin + offest at which the computed
|
||||
score will equal `decay` parameter. For geo fields: Can be defined as number+unit (1km, 12m,...).
|
||||
Default unit is meters. For date fields: Can to be defined as a number+unit ("1h", "10d",...).
|
||||
Default unit is milliseconds. For numeric field: Any number.
|
||||
|
|
|
@ -84,7 +84,7 @@ suggester in the same spot you'd use the `term` suggester:
|
|||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
POST _suggest?pretty -d'
|
||||
POST _suggest
|
||||
{
|
||||
"text": "noble prize",
|
||||
"simple_phrase": {
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
|
|||
import org.elasticsearch.action.bulk.Retry;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
|
@ -43,7 +42,6 @@ import org.junit.Before;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
||||
|
@ -63,12 +61,12 @@ public class RetryTests extends ESSingleNodeTestCase {
|
|||
|
||||
private List<CyclicBarrier> blockedExecutors = new ArrayList<>();
|
||||
|
||||
private boolean useNetty4;
|
||||
private boolean useNetty3;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
useNetty4 = randomBoolean();
|
||||
useNetty3 = randomBoolean();
|
||||
createIndex("source");
|
||||
// Build the test data. Don't use indexRandom because that won't work consistently with such small thread pools.
|
||||
BulkRequestBuilder bulk = client().prepareBulk();
|
||||
|
@ -112,9 +110,9 @@ public class RetryTests extends ESSingleNodeTestCase {
|
|||
settings.put(NetworkModule.HTTP_ENABLED.getKey(), true);
|
||||
// Whitelist reindexing from the http host we're going to use
|
||||
settings.put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "myself");
|
||||
if (useNetty4) {
|
||||
settings.put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME);
|
||||
settings.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME);
|
||||
if (useNetty3) {
|
||||
settings.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME);
|
||||
settings.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME);
|
||||
}
|
||||
return settings.build();
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.common.transport.TransportAddress;
|
|||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.MockTcpTransportPlugin;
|
||||
import org.elasticsearch.transport.Netty3Plugin;
|
||||
import org.elasticsearch.transport.Netty4Plugin;
|
||||
import org.elasticsearch.transport.client.PreBuiltTransportClient;
|
||||
import org.junit.After;
|
||||
|
@ -91,7 +92,7 @@ public abstract class ESSmokeClientTestCase extends LuceneTestCase {
|
|||
break;
|
||||
case 1:
|
||||
plugins = Collections.emptyList();
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME);
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME);
|
||||
break;
|
||||
case 2:
|
||||
plugins = Collections.emptyList();
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.smoketest;
|
||||
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -27,10 +28,19 @@ import static org.hamcrest.CoreMatchers.is;
|
|||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
public class SmokeTestClientIT extends ESSmokeClientTestCase {
|
||||
|
||||
// needed to avoid the test suite from failing for having no tests
|
||||
// TODO: remove when Netty 4.1.5 is upgraded to Netty 4.1.6 including https://github.com/netty/netty/pull/5778
|
||||
public void testSoThatTestsDoNotFail() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that we are connected to a cluster named "elasticsearch".
|
||||
*/
|
||||
public void testSimpleClient() {
|
||||
// TODO: remove when Netty 4.1.5 is upgraded to Netty 4.1.6 including https://github.com/netty/netty/pull/5778
|
||||
assumeFalse("JDK is JDK 9", Constants.JRE_IS_MINIMUM_JAVA9);
|
||||
Client client = getClient();
|
||||
|
||||
// START SNIPPET: java-doc-admin-cluster-health
|
||||
|
@ -45,6 +55,8 @@ public class SmokeTestClientIT extends ESSmokeClientTestCase {
|
|||
* Create an index and index some docs
|
||||
*/
|
||||
public void testPutDocument() {
|
||||
// TODO: remove when Netty 4.1.5 is upgraded to Netty 4.1.6 including https://github.com/netty/netty/pull/5778
|
||||
assumeFalse("JDK is JDK 9", Constants.JRE_IS_MINIMUM_JAVA9);
|
||||
Client client = getClient();
|
||||
|
||||
// START SNIPPET: java-doc-index-doc-simple
|
||||
|
@ -63,5 +75,6 @@ public class SmokeTestClientIT extends ESSmokeClientTestCase {
|
|||
assertThat(searchResponse.getHits().getTotalHits(), is(1L));
|
||||
// END SNIPPET: java-doc-search-simple
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -13,3 +13,4 @@
|
|||
2.3.3
|
||||
2.3.4
|
||||
2.3.5
|
||||
2.4.0
|
||||
|
|
|
@ -43,9 +43,9 @@
|
|||
"options" : ["immediate", "urgent", "high", "normal", "low", "languid"],
|
||||
"description" : "Wait until all currently queued events with the given priorty are processed"
|
||||
},
|
||||
"wait_for_relocating_shards": {
|
||||
"type" : "number",
|
||||
"description" : "Wait until the specified number of relocating shards is finished"
|
||||
"wait_for_no_relocating_shards": {
|
||||
"type" : "boolean",
|
||||
"description" : "Whether to wait until there are no relocating shards in the cluster"
|
||||
},
|
||||
"wait_for_status": {
|
||||
"type" : "enum",
|
||||
|
|
|
@ -27,6 +27,59 @@
|
|||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: green
|
||||
wait_for_no_relocating_shards: true
|
||||
|
||||
- is_true: cluster_name
|
||||
- is_false: timed_out
|
||||
- gte: { number_of_nodes: 1 }
|
||||
- gte: { number_of_data_nodes: 1 }
|
||||
- gt: { active_primary_shards: 0 }
|
||||
- gt: { active_shards: 0 }
|
||||
- gte: { relocating_shards: 0 }
|
||||
- match: { initializing_shards: 0 }
|
||||
- match: { unassigned_shards: 0 }
|
||||
- gte: { number_of_pending_tasks: 0 }
|
||||
|
||||
---
|
||||
"cluster health basic test, one index with wait for active shards":
|
||||
- do:
|
||||
indices.create:
|
||||
index: test_index
|
||||
body:
|
||||
settings:
|
||||
index:
|
||||
number_of_replicas: 0
|
||||
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_active_shards: 1
|
||||
wait_for_no_relocating_shards: true
|
||||
|
||||
- is_true: cluster_name
|
||||
- is_false: timed_out
|
||||
- gte: { number_of_nodes: 1 }
|
||||
- gte: { number_of_data_nodes: 1 }
|
||||
- gt: { active_primary_shards: 0 }
|
||||
- gt: { active_shards: 0 }
|
||||
- gte: { relocating_shards: 0 }
|
||||
- match: { initializing_shards: 0 }
|
||||
- match: { unassigned_shards: 0 }
|
||||
- gte: { number_of_pending_tasks: 0 }
|
||||
|
||||
---
|
||||
"cluster health basic test, one index with wait for all active shards":
|
||||
- do:
|
||||
indices.create:
|
||||
index: test_index
|
||||
body:
|
||||
settings:
|
||||
index:
|
||||
number_of_replicas: 0
|
||||
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_active_shards: all
|
||||
wait_for_no_relocating_shards: true
|
||||
|
||||
- is_true: cluster_name
|
||||
- is_false: timed_out
|
||||
|
|
|
@ -1,10 +1,29 @@
|
|||
---
|
||||
"cluster health request timeout":
|
||||
"cluster health request timeout on waiting for nodes":
|
||||
- do:
|
||||
catch: request_timeout
|
||||
cluster.health:
|
||||
wait_for_nodes: 10
|
||||
timeout: 1s
|
||||
timeout: 1ms
|
||||
|
||||
- is_true: cluster_name
|
||||
- is_true: timed_out
|
||||
- gte: { number_of_nodes: 1 }
|
||||
- gte: { number_of_data_nodes: 1 }
|
||||
- match: { active_primary_shards: 0 }
|
||||
- match: { active_shards: 0 }
|
||||
- match: { relocating_shards: 0 }
|
||||
- match: { initializing_shards: 0 }
|
||||
- match: { unassigned_shards: 0 }
|
||||
- gte: { number_of_pending_tasks: 0 }
|
||||
|
||||
---
|
||||
"cluster health request timeout waiting for active shards":
|
||||
- do:
|
||||
catch: request_timeout
|
||||
cluster.health:
|
||||
timeout: 1ms
|
||||
wait_for_active_shards: 5
|
||||
|
||||
- is_true: cluster_name
|
||||
- is_true: timed_out
|
||||
|
|
|
@ -47,7 +47,7 @@
|
|||
cluster.health:
|
||||
wait_for_status: green
|
||||
index: source
|
||||
wait_for_relocating_shards: 0
|
||||
wait_for_no_relocating_shards: true
|
||||
wait_for_events: "languid"
|
||||
|
||||
# now we do the actual shrink
|
||||
|
|
|
@ -873,7 +873,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
*/
|
||||
public ClusterHealthStatus ensureGreen(TimeValue timeout, String... indices) {
|
||||
ClusterHealthResponse actionGet = client().admin().cluster()
|
||||
.health(Requests.clusterHealthRequest(indices).timeout(timeout).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
|
||||
.health(Requests.clusterHealthRequest(indices).timeout(timeout).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForNoRelocatingShards(true)).actionGet();
|
||||
if (actionGet.isTimedOut()) {
|
||||
logger.info("ensureGreen timed out, cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
|
||||
fail("timed out waiting for green state");
|
||||
|
@ -895,7 +895,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
* using the cluster health API.
|
||||
*/
|
||||
public ClusterHealthStatus waitForRelocation(ClusterHealthStatus status) {
|
||||
ClusterHealthRequest request = Requests.clusterHealthRequest().waitForRelocatingShards(0);
|
||||
ClusterHealthRequest request = Requests.clusterHealthRequest().waitForNoRelocatingShards(true);
|
||||
if (status != null) {
|
||||
request.waitForStatus(status);
|
||||
}
|
||||
|
@ -997,7 +997,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
*/
|
||||
public ClusterHealthStatus ensureYellow(String... indices) {
|
||||
ClusterHealthResponse actionGet = client().admin().cluster()
|
||||
.health(Requests.clusterHealthRequest(indices).waitForRelocatingShards(0).waitForYellowStatus().waitForEvents(Priority.LANGUID)).actionGet();
|
||||
.health(Requests.clusterHealthRequest(indices).waitForNoRelocatingShards(true).waitForYellowStatus().waitForEvents(Priority.LANGUID)).actionGet();
|
||||
if (actionGet.isTimedOut()) {
|
||||
logger.info("ensureYellow timed out, cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
|
||||
assertThat("timed out waiting for yellow", actionGet.isTimedOut(), equalTo(false));
|
||||
|
@ -1106,7 +1106,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
.setWaitForNodes(Integer.toString(nodeCount))
|
||||
.setTimeout(timeValue)
|
||||
.setLocal(local)
|
||||
.setWaitForRelocatingShards(0)
|
||||
.setWaitForNoRelocatingShards(true)
|
||||
.get();
|
||||
if (clusterHealthResponse.isTimedOut()) {
|
||||
ClusterStateResponse stateResponse = client(viaNode).admin().cluster().prepareState().get();
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.cluster.ClusterName;
|
|||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -59,7 +58,6 @@ import java.util.Collections;
|
|||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
/**
|
||||
|
@ -259,7 +257,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
|
|||
// Wait for the index to be allocated so that cluster state updates don't override
|
||||
// changes that would have been done locally
|
||||
ClusterHealthResponse health = client().admin().cluster()
|
||||
.health(Requests.clusterHealthRequest(index).waitForYellowStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
|
||||
.health(Requests.clusterHealthRequest(index).waitForYellowStatus().waitForEvents(Priority.LANGUID).waitForNoRelocatingShards(true)).actionGet();
|
||||
assertThat(health.getStatus(), lessThanOrEqualTo(ClusterHealthStatus.YELLOW));
|
||||
assertThat("Cluster must be a single node cluster", health.getNumberOfDataNodes(), equalTo(1));
|
||||
IndicesService instanceFromNode = getInstanceFromNode(IndicesService.class);
|
||||
|
@ -302,7 +300,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
|
|||
*/
|
||||
public ClusterHealthStatus ensureGreen(TimeValue timeout, String... indices) {
|
||||
ClusterHealthResponse actionGet = client().admin().cluster()
|
||||
.health(Requests.clusterHealthRequest(indices).timeout(timeout).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
|
||||
.health(Requests.clusterHealthRequest(indices).timeout(timeout).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForNoRelocatingShards(true)).actionGet();
|
||||
if (actionGet.isTimedOut()) {
|
||||
logger.info("ensureGreen timed out, cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
|
||||
assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false));
|
||||
|
|
|
@ -77,7 +77,7 @@ public class NetworkDisruption implements ServiceDisruptionScheme {
|
|||
protected void ensureNodeCount(InternalTestCluster cluster) {
|
||||
assertFalse("cluster failed to form after disruption was healed", cluster.client().admin().cluster().prepareHealth()
|
||||
.setWaitForNodes("" + cluster.size())
|
||||
.setWaitForRelocatingShards(0)
|
||||
.setWaitForNoRelocatingShards(true)
|
||||
.get().isTimedOut());
|
||||
}
|
||||
|
||||
|
|
|
@ -1,204 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.test.disruption;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
public abstract class NetworkPartition implements ServiceDisruptionScheme {
|
||||
|
||||
protected final Logger logger = Loggers.getLogger(getClass());
|
||||
|
||||
final Set<String> nodesSideOne;
|
||||
final Set<String> nodesSideTwo;
|
||||
volatile boolean autoExpand;
|
||||
protected final Random random;
|
||||
protected volatile InternalTestCluster cluster;
|
||||
protected volatile boolean activeDisruption = false;
|
||||
|
||||
|
||||
public NetworkPartition(Random random) {
|
||||
this.random = new Random(random.nextLong());
|
||||
nodesSideOne = new HashSet<>();
|
||||
nodesSideTwo = new HashSet<>();
|
||||
autoExpand = true;
|
||||
}
|
||||
|
||||
public NetworkPartition(String node1, String node2, Random random) {
|
||||
this(random);
|
||||
nodesSideOne.add(node1);
|
||||
nodesSideTwo.add(node2);
|
||||
autoExpand = false;
|
||||
}
|
||||
|
||||
public NetworkPartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, Random random) {
|
||||
this(random);
|
||||
this.nodesSideOne.addAll(nodesSideOne);
|
||||
this.nodesSideTwo.addAll(nodesSideTwo);
|
||||
autoExpand = false;
|
||||
}
|
||||
|
||||
|
||||
public Collection<String> getNodesSideOne() {
|
||||
return Collections.unmodifiableCollection(nodesSideOne);
|
||||
}
|
||||
|
||||
public Collection<String> getNodesSideTwo() {
|
||||
return Collections.unmodifiableCollection(nodesSideTwo);
|
||||
}
|
||||
|
||||
public Collection<String> getMajoritySide() {
|
||||
if (nodesSideOne.size() >= nodesSideTwo.size()) {
|
||||
return getNodesSideOne();
|
||||
} else {
|
||||
return getNodesSideTwo();
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<String> getMinoritySide() {
|
||||
if (nodesSideOne.size() >= nodesSideTwo.size()) {
|
||||
return getNodesSideTwo();
|
||||
} else {
|
||||
return getNodesSideOne();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyToCluster(InternalTestCluster cluster) {
|
||||
this.cluster = cluster;
|
||||
if (autoExpand) {
|
||||
for (String node : cluster.getNodeNames()) {
|
||||
applyToNode(node, cluster);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeFromCluster(InternalTestCluster cluster) {
|
||||
stopDisrupting();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeAndEnsureHealthy(InternalTestCluster cluster) {
|
||||
removeFromCluster(cluster);
|
||||
ensureNodeCount(cluster);
|
||||
}
|
||||
|
||||
protected void ensureNodeCount(InternalTestCluster cluster) {
|
||||
assertFalse("cluster failed to form after disruption was healed", cluster.client().admin().cluster().prepareHealth()
|
||||
.setWaitForNodes("" + cluster.size())
|
||||
.setWaitForRelocatingShards(0)
|
||||
.get().isTimedOut());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void applyToNode(String node, InternalTestCluster cluster) {
|
||||
if (!autoExpand || nodesSideOne.contains(node) || nodesSideTwo.contains(node)) {
|
||||
return;
|
||||
}
|
||||
if (nodesSideOne.isEmpty()) {
|
||||
nodesSideOne.add(node);
|
||||
} else if (nodesSideTwo.isEmpty()) {
|
||||
nodesSideTwo.add(node);
|
||||
} else if (random.nextBoolean()) {
|
||||
nodesSideOne.add(node);
|
||||
} else {
|
||||
nodesSideTwo.add(node);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void removeFromNode(String node, InternalTestCluster cluster) {
|
||||
MockTransportService transportService = (MockTransportService) cluster.getInstance(TransportService.class, node);
|
||||
Set<String> otherSideNodes;
|
||||
if (nodesSideOne.contains(node)) {
|
||||
otherSideNodes = nodesSideTwo;
|
||||
nodesSideOne.remove(node);
|
||||
} else if (nodesSideTwo.contains(node)) {
|
||||
otherSideNodes = nodesSideOne;
|
||||
nodesSideTwo.remove(node);
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
for (String node2 : otherSideNodes) {
|
||||
MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
|
||||
removeDisruption(transportService, transportService2);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void testClusterClosed() {
|
||||
|
||||
}
|
||||
|
||||
protected abstract String getPartitionDescription();
|
||||
|
||||
@Override
|
||||
public synchronized void startDisrupting() {
|
||||
if (nodesSideOne.size() == 0 || nodesSideTwo.size() == 0) {
|
||||
return;
|
||||
}
|
||||
logger.info("nodes {} will be partitioned from {}. partition type [{}]", nodesSideOne, nodesSideTwo, getPartitionDescription());
|
||||
activeDisruption = true;
|
||||
for (String node1 : nodesSideOne) {
|
||||
MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1);
|
||||
for (String node2 : nodesSideTwo) {
|
||||
MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
|
||||
applyDisruption(transportService1, transportService2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized void stopDisrupting() {
|
||||
if (nodesSideOne.size() == 0 || nodesSideTwo.size() == 0 || !activeDisruption) {
|
||||
return;
|
||||
}
|
||||
logger.info("restoring partition between nodes {} & nodes {}", nodesSideOne, nodesSideTwo);
|
||||
for (String node1 : nodesSideOne) {
|
||||
MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1);
|
||||
for (String node2 : nodesSideTwo) {
|
||||
MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
|
||||
removeDisruption(transportService1, transportService2);
|
||||
}
|
||||
}
|
||||
activeDisruption = false;
|
||||
}
|
||||
|
||||
abstract void applyDisruption(MockTransportService transportService1, MockTransportService transportService2);
|
||||
|
||||
|
||||
protected void removeDisruption(MockTransportService transportService1, MockTransportService transportService2) {
|
||||
transportService1.clearRule(transportService2);
|
||||
transportService2.clearRule(transportService1);
|
||||
}
|
||||
|
||||
}
|
|
@ -85,7 +85,7 @@ public abstract class SingleNodeDisruption implements ServiceDisruptionScheme {
|
|||
protected void ensureNodeCount(InternalTestCluster cluster) {
|
||||
assertFalse("cluster failed to form after disruption was healed", cluster.client().admin().cluster().prepareHealth()
|
||||
.setWaitForNodes("" + cluster.size())
|
||||
.setWaitForRelocatingShards(0)
|
||||
.setWaitForNoRelocatingShards(true)
|
||||
.get().isTimedOut());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,7 +47,8 @@ public class LoggingListener extends RunListener {
|
|||
|
||||
@Override
|
||||
public void testRunStarted(Description description) throws Exception {
|
||||
previousPackageLoggingMap = processTestLogging(description.getTestClass().getPackage().getAnnotation(TestLogging.class));
|
||||
Package testClassPackage = description.getTestClass().getPackage();
|
||||
previousPackageLoggingMap = processTestLogging(testClassPackage != null ? testClassPackage.getAnnotation(TestLogging.class) : null);
|
||||
previousClassLoggingMap = processTestLogging(description.getAnnotation(TestLogging.class));
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
class Dummy {
|
||||
}
|
|
@ -34,6 +34,14 @@ import static org.hamcrest.CoreMatchers.equalTo;
|
|||
|
||||
public class LoggingListenerTests extends ESTestCase {
|
||||
|
||||
public void testTestRunStartedSupportsClassInDefaultPackage() throws Exception {
|
||||
LoggingListener loggingListener = new LoggingListener();
|
||||
Description description = Description.createTestDescription(Class.forName("Dummy"), "dummy");
|
||||
|
||||
// Will throw an exception without the check for testClassPackage != null in testRunStarted
|
||||
loggingListener.testRunStarted(description);
|
||||
}
|
||||
|
||||
public void testCustomLevelPerMethod() throws Exception {
|
||||
LoggingListener loggingListener = new LoggingListener();
|
||||
|
||||
|
|
Loading…
Reference in New Issue