Add timeout on cat/stats API (#552)

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
This commit is contained in:
Dhwanil Patel 2021-05-20 13:30:32 +05:30 committed by GitHub
parent e0c8b7ea27
commit a3f29b3875
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 300 additions and 3 deletions

View File

@ -0,0 +1,193 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.action.admin;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.info.NodesInfoAction;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryAction;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsAction;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.test.transport.StubbableTransport;
import org.opensearch.transport.ReceiveTimeoutTransportException;
import org.opensearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.containsString;
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class ClientTimeoutIT extends OpenSearchIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(MockTransportService.TestPlugin.class);
}
public void testNodesInfoTimeout(){
String masterNode = internalCluster().startMasterOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
String anotherDataNode = internalCluster().startDataOnlyNode();
// Happy case
NodesInfoResponse response = dataNodeClient().admin().cluster().prepareNodesInfo().get();
assertThat(response.getNodes().size(), equalTo(3));
//simulate timeout on bad node.
simulateTimeoutAtTransport(dataNode, anotherDataNode, NodesInfoAction.NAME);
// One bad data node
response = dataNodeClient().admin().cluster().prepareNodesInfo().get();
ArrayList<String> nodes = new ArrayList<String>();
for(NodeInfo node : response.getNodes()) {
nodes.add(node.getNode().getName());
}
assertThat(response.getNodes().size(), equalTo(2));
assertThat(nodes.contains(masterNode), is(true));
}
public void testNodesStatsTimeout(){
String masterNode = internalCluster().startMasterOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
String anotherDataNode = internalCluster().startDataOnlyNode();
TimeValue timeout = TimeValue.timeValueMillis(1000);
// Happy case
NodesStatsResponse response1 = dataNodeClient().admin().cluster().prepareNodesStats().get();
assertThat(response1.getNodes().size(), equalTo(3));
// One bad data node
simulateTimeoutAtTransport(dataNode, anotherDataNode, NodesStatsAction.NAME);
NodesStatsResponse response = dataNodeClient().admin().cluster().prepareNodesStats().get();
ArrayList<String> nodes = new ArrayList<String>();
for(NodeStats node : response.getNodes()) {
nodes.add(node.getNode().getName());
}
assertThat(response.getNodes().size(), equalTo(2));
assertThat(nodes.contains(masterNode), is(true));
}
public void testListTasksTimeout(){
String masterNode = internalCluster().startMasterOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
String anotherDataNode = internalCluster().startDataOnlyNode();
TimeValue timeout = TimeValue.timeValueMillis(1000);
// Happy case
ListTasksResponse response1 = dataNodeClient().admin().cluster().prepareListTasks().get();
assertThat(response1.getPerNodeTasks().keySet().size(), equalTo(3));
// One bad data node
simulateTimeoutAtTransport(dataNode, anotherDataNode, NodesStatsAction.NAME);
ListTasksResponse response = dataNodeClient().admin().cluster().prepareListTasks().get();
assertNull(response.getPerNodeTasks().get(anotherDataNode));
}
public void testRecoveriesWithTimeout(){
internalCluster().startMasterOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
String anotherDataNode = internalCluster().startDataOnlyNode();
int numShards = 4;
assertAcked(prepareCreate("test-index", 0, Settings.builder().
put("number_of_shards", numShards).put("routing.allocation.total_shards_per_node", 2).
put("number_of_replicas", 0)));
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index("test-index", "doc", Integer.toString(i));
}
refresh("test-index");
ensureSearchable("test-index");
// Happy case
RecoveryResponse recoveryResponse = dataNodeClient().admin().indices().prepareRecoveries().get();
assertThat(recoveryResponse.getTotalShards(), equalTo(numShards));
assertThat(recoveryResponse.getSuccessfulShards(), equalTo(numShards));
//simulate timeout on bad node.
simulateTimeoutAtTransport(dataNode, anotherDataNode, RecoveryAction.NAME);
//verify response with bad node.
recoveryResponse = dataNodeClient().admin().indices().prepareRecoveries().get();
assertThat(recoveryResponse.getTotalShards(), equalTo(numShards));
assertThat(recoveryResponse.getSuccessfulShards(), equalTo(numShards/2));
assertThat(recoveryResponse.getFailedShards(), equalTo(numShards/2));
assertThat(recoveryResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}
public void testStatsWithTimeout(){
internalCluster().startMasterOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
String anotherDataNode = internalCluster().startDataOnlyNode();
int numShards = 4;
logger.info("--> creating index");
assertAcked(prepareCreate("test-index", 0, Settings.builder().
put("number_of_shards", numShards).put("routing.allocation.total_shards_per_node", 2).
put("number_of_replicas", 0)));
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index("test-index", "doc", Integer.toString(i));
}
refresh("test-index");
ensureSearchable("test-index");
//happy case
IndicesStatsResponse indicesStats = dataNodeClient().admin().indices().prepareStats().setDocs(true).get();
assertThat(indicesStats.getTotalShards(), equalTo(numShards));
assertThat(indicesStats.getSuccessfulShards(), equalTo(numShards));
// simulate timeout on bad node.
simulateTimeoutAtTransport(dataNode, anotherDataNode, IndicesStatsAction.NAME);
// verify indices state response with bad node.
indicesStats = dataNodeClient().admin().indices().prepareStats().setDocs(true).get();
assertThat(indicesStats.getTotalShards(), equalTo(numShards));
assertThat(indicesStats.getFailedShards(), equalTo(numShards/2));
assertThat(indicesStats.getSuccessfulShards(), equalTo(numShards/2));
assertThat(indicesStats.getTotal().getDocs().getCount(), lessThan(numDocs));
assertThat(indicesStats.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}
private void simulateTimeoutAtTransport(String dataNode, String anotherDataNode, String transportActionName) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class,
dataNode));
StubbableTransport.SendRequestBehavior sendBehaviour = (connection, requestId, action, request, options) -> {
if (action.startsWith(transportActionName)) {
throw new ReceiveTimeoutTransportException(connection.getNode(), action, "simulate timeout");
}
connection.sendRequest(requestId, action, request, options);
};
mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, anotherDataNode), sendBehaviour);
MockTransportService mockTransportServiceAnotherNode = ((MockTransportService) internalCluster().getInstance(TransportService.class,
anotherDataNode));
mockTransportServiceAnotherNode.addSendBehavior(internalCluster().getInstance(TransportService.class, dataNode), sendBehaviour);
}
}

View File

@ -34,6 +34,7 @@ package org.opensearch.action.admin.indices.stats;
import org.opensearch.action.support.broadcast.BroadcastOperationRequestBuilder;
import org.opensearch.client.OpenSearchClient;
import org.opensearch.common.unit.TimeValue;
/**
* A request to get indices level stats. Allow to enable different stats to be returned.
@ -67,6 +68,14 @@ public class IndicesStatsRequestBuilder
return this;
}
/**
* Sets timeout of request.
*/
public final IndicesStatsRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return this;
}
/**
* Document types to return stats for. Mainly affects {@link #setIndexing(boolean)} when
* enabled, returning specific indexing stats for those types.

View File

@ -38,6 +38,7 @@ import org.opensearch.action.IndicesRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;
import java.io.IOException;
@ -66,6 +67,9 @@ public class BroadcastRequest<Request extends BroadcastRequest<Request>> extends
return indices;
}
private final TimeValue DEFAULT_TIMEOUT_SECONDS = TimeValue.timeValueSeconds(30);
private TimeValue timeout;
@SuppressWarnings("unchecked")
@Override
public final Request indices(String... indices) {
@ -73,6 +77,22 @@ public class BroadcastRequest<Request extends BroadcastRequest<Request>> extends
return (Request) this;
}
public TimeValue timeout() {
return this.timeout;
}
@SuppressWarnings("unchecked")
public final Request timeout(TimeValue timeout) {
this.timeout = timeout;
return (Request) this;
}
@SuppressWarnings("unchecked")
public final Request timeout(String timeout) {
this.timeout = TimeValue.parseTimeValue(timeout, DEFAULT_TIMEOUT_SECONDS, getClass().getSimpleName() + ".timeout");
return (Request) this;
}
@Override
public ActionRequestValidationException validate() {
return null;

View File

@ -66,6 +66,7 @@ import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportRequestOptions;
import java.io.IOException;
import java.util.ArrayList;
@ -334,7 +335,12 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
}
transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new TransportResponseHandler<NodeResponse>() {
TransportRequestOptions transportRequestOptions = TransportRequestOptions.EMPTY;
if (request != null && request.timeout() != null) {
transportRequestOptions = TransportRequestOptions.builder().withTimeout(request.timeout()).build();
}
transportService.sendRequest(
node, transportNodeBroadcastAction, nodeRequest, transportRequestOptions, new TransportResponseHandler<NodeResponse>() {
@Override
public NodeResponse read(StreamInput in) throws IOException {
return new NodeResponse(in);

View File

@ -61,6 +61,7 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
* will be ignored and this will be used.
* */
private DiscoveryNode[] concreteNodes;
private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30);
private TimeValue timeout;
@ -102,7 +103,7 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
@SuppressWarnings("unchecked")
public final Request timeout(String timeout) {
this.timeout = TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout");
this.timeout = TimeValue.parseTimeValue(timeout, DEFAULT_TIMEOUT_SECS, getClass().getSimpleName() + ".timeout");
return (Request) this;
}
public DiscoveryNode[] concreteNodes() {

View File

@ -78,6 +78,7 @@ public class RestNodesInfoAction extends BaseRestHandler {
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final NodesInfoRequest nodesInfoRequest = prepareRequest(request);
nodesInfoRequest.timeout(request.param("timeout"));
settingsFilter.addFilterSettingParams(request);
return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));

View File

@ -90,6 +90,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.timeout(request.param("timeout"));
boolean forbidClosedIndices = request.paramAsBoolean("forbid_closed_indices", true);
IndicesOptions defaultIndicesOption = forbidClosedIndices ? indicesStatsRequest.indicesOptions()
: IndicesOptions.strictExpandOpen();

View File

@ -85,7 +85,8 @@ public abstract class AbstractCatAction extends BaseRestHandler {
}
static Set<String> RESPONSE_PARAMS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("format", "h", "v", "ts", "pri", "bytes", "size", "time", "s")));
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
"format", "h", "v", "ts", "pri", "bytes", "size", "time", "s", "timeout")));
@Override
protected Set<String> responseParams() {

View File

@ -88,6 +88,7 @@ public class RestAllocationAction extends AbstractCatAction {
@Override
public void processResponse(final ClusterStateResponse state) {
NodesStatsRequest statsRequest = new NodesStatsRequest(nodes);
statsRequest.timeout(request.param("timeout"));
statsRequest.clear().addMetric(NodesStatsRequest.Metric.FS.metricName())
.indices(new CommonStatsFlags(CommonStatsFlags.Flag.Store));

View File

@ -84,6 +84,7 @@ public class RestCatRecoveryAction extends AbstractCatAction {
@Override
public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) {
final RecoveryRequest recoveryRequest = new RecoveryRequest(Strings.splitStringByCommaToArray(request.param("index")));
recoveryRequest.timeout(request.param("timeout"));
recoveryRequest.detailed(request.paramAsBoolean("detailed", false));
recoveryRequest.activeOnly(request.paramAsBoolean("active_only", false));
recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions()));

View File

@ -82,6 +82,7 @@ public class RestNodeAttrsAction extends AbstractCatAction {
@Override
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.clear()
.addMetric(NodesInfoRequest.Metric.PROCESS.metricName());
client.admin().cluster().nodesInfo(nodesInfoRequest, new RestResponseListener<NodesInfoResponse>(channel) {

View File

@ -116,6 +116,7 @@ public class RestNodesAction extends AbstractCatAction {
@Override
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.clear().addMetrics(
NodesInfoRequest.Metric.JVM.metricName(),
NodesInfoRequest.Metric.OS.metricName(),
@ -125,6 +126,7 @@ public class RestNodesAction extends AbstractCatAction {
@Override
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.timeout(request.param("timeout"));
nodesStatsRequest.clear().indices(true).addMetrics(
NodesStatsRequest.Metric.JVM.metricName(),
NodesStatsRequest.Metric.OS.metricName(),

View File

@ -81,6 +81,7 @@ public class RestPluginsAction extends AbstractCatAction {
@Override
public void processResponse(final ClusterStateResponse clusterStateResponse) throws Exception {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.clear()
.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
client.admin().cluster().nodesInfo(nodesInfoRequest, new RestResponseListener<NodesInfoResponse>(channel) {

View File

@ -97,6 +97,7 @@ public class RestThreadPoolAction extends AbstractCatAction {
@Override
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.clear().addMetrics(
NodesInfoRequest.Metric.PROCESS.metricName(),
NodesInfoRequest.Metric.THREAD_POOL.metricName());
@ -104,6 +105,7 @@ public class RestThreadPoolAction extends AbstractCatAction {
@Override
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.timeout(request.param("timeout"));
nodesStatsRequest.clear().addMetric(NodesStatsRequest.Metric.THREAD_POOL.metricName());
client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener<NodesStatsResponse>(channel) {
@Override

View File

@ -71,6 +71,7 @@ import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ReceiveTimeoutTransportException;
import org.opensearch.transport.TestTransportChannel;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportService;
@ -488,4 +489,60 @@ public class TransportBroadcastByNodeActionTests extends OpenSearchTestCase {
assertEquals("failed shards", totalFailedShards, response.getFailedShards());
assertEquals("accumulated exceptions", totalFailedShards, response.getShardFailures().length);
}
public void testResultWithTimeouts() throws ExecutionException, InterruptedException {
Request request = new Request(new String[]{TEST_INDEX});
PlainActionFuture<Response> listener = new PlainActionFuture<>();
action.new AsyncAction(null, request, listener).start();
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
ShardsIterator shardIt = clusterService.state().getRoutingTable().allShards(new String[]{TEST_INDEX});
Map<String, List<ShardRouting>> map = new HashMap<>();
for (ShardRouting shard : shardIt) {
if (!map.containsKey(shard.currentNodeId())) {
map.put(shard.currentNodeId(), new ArrayList<>());
}
map.get(shard.currentNodeId()).add(shard);
}
int totalShards = 0;
int totalSuccessfulShards = 0;
int totalFailedShards = 0;
String failedNodeId = "node_" + randomIntBetween(0, capturedRequests.size());
for (Map.Entry<String, List<CapturingTransport.CapturedRequest>> entry : capturedRequests.entrySet()) {
List<BroadcastShardOperationFailedException> exceptions = new ArrayList<>();
long requestId = entry.getValue().get(0).requestId;
if (entry.getKey().equals(failedNodeId)) {
// simulate node timeout
totalShards += map.get(entry.getKey()).size();
totalFailedShards += map.get(entry.getKey()).size();
transport.handleError(requestId,
new ReceiveTimeoutTransportException(clusterService.state().getRoutingNodes().node(entry.getKey()).node(),
"indices:admin/test" , "time_out_simulated"));
} else {
List<ShardRouting> shards = map.get(entry.getKey());
List<TransportBroadcastByNodeAction.EmptyResult> shardResults = new ArrayList<>();
for (ShardRouting shard : shards) {
totalShards++;
if (rarely()) {
// simulate operation failure
totalFailedShards++;
exceptions.add(new BroadcastShardOperationFailedException(shard.shardId(), "operation indices:admin/test failed"));
} else {
shardResults.add(TransportBroadcastByNodeAction.EmptyResult.INSTANCE);
}
}
totalSuccessfulShards += shardResults.size();
TransportBroadcastByNodeAction.NodeResponse nodeResponse = action.new NodeResponse(entry.getKey(), shards.size(),
shardResults, exceptions);
transport.handleResponse(requestId, nodeResponse);
}
}
Response response = listener.get();
assertEquals("total shards", totalShards, response.getTotalShards());
assertEquals("successful shards", totalSuccessfulShards, response.getSuccessfulShards());
assertEquals("failed shards", totalFailedShards, response.getFailedShards());
assertEquals("accumulated exceptions", totalFailedShards, response.getShardFailures().length);
}
}