diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java new file mode 100644 index 00000000000..10e2514ef74 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java @@ -0,0 +1,193 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin; + +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.info.NodesInfoAction; +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.opensearch.action.admin.indices.recovery.RecoveryAction; +import org.opensearch.action.admin.indices.recovery.RecoveryResponse; +import org.opensearch.action.admin.indices.stats.IndicesStatsAction; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.test.transport.StubbableTransport; +import org.opensearch.transport.ReceiveTimeoutTransportException; +import org.opensearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.containsString; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) +public class ClientTimeoutIT extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(MockTransportService.TestPlugin.class); + } + + public void testNodesInfoTimeout(){ + String masterNode = internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + String anotherDataNode = internalCluster().startDataOnlyNode(); + + // Happy case + NodesInfoResponse response = dataNodeClient().admin().cluster().prepareNodesInfo().get(); + assertThat(response.getNodes().size(), equalTo(3)); + + //simulate timeout on bad node. + simulateTimeoutAtTransport(dataNode, anotherDataNode, NodesInfoAction.NAME); + + // One bad data node + response = dataNodeClient().admin().cluster().prepareNodesInfo().get(); + ArrayList nodes = new ArrayList(); + for(NodeInfo node : response.getNodes()) { + nodes.add(node.getNode().getName()); + } + assertThat(response.getNodes().size(), equalTo(2)); + assertThat(nodes.contains(masterNode), is(true)); + } + + public void testNodesStatsTimeout(){ + String masterNode = internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + String anotherDataNode = internalCluster().startDataOnlyNode(); + TimeValue timeout = TimeValue.timeValueMillis(1000); + + // Happy case + NodesStatsResponse response1 = dataNodeClient().admin().cluster().prepareNodesStats().get(); + assertThat(response1.getNodes().size(), equalTo(3)); + + // One bad data node + simulateTimeoutAtTransport(dataNode, anotherDataNode, NodesStatsAction.NAME); + + NodesStatsResponse response = dataNodeClient().admin().cluster().prepareNodesStats().get(); + ArrayList nodes = new ArrayList(); + for(NodeStats node : response.getNodes()) { + nodes.add(node.getNode().getName()); + } + assertThat(response.getNodes().size(), equalTo(2)); + assertThat(nodes.contains(masterNode), is(true)); + } + + public void testListTasksTimeout(){ + String masterNode = internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + String anotherDataNode = internalCluster().startDataOnlyNode(); + TimeValue timeout = TimeValue.timeValueMillis(1000); + + // Happy case + ListTasksResponse response1 = dataNodeClient().admin().cluster().prepareListTasks().get(); + assertThat(response1.getPerNodeTasks().keySet().size(), equalTo(3)); + + // One bad data node + simulateTimeoutAtTransport(dataNode, anotherDataNode, NodesStatsAction.NAME); + + ListTasksResponse response = dataNodeClient().admin().cluster().prepareListTasks().get(); + assertNull(response.getPerNodeTasks().get(anotherDataNode)); + } + + public void testRecoveriesWithTimeout(){ + internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + String anotherDataNode = internalCluster().startDataOnlyNode(); + + int numShards = 4; + assertAcked(prepareCreate("test-index", 0, Settings.builder(). + put("number_of_shards", numShards).put("routing.allocation.total_shards_per_node", 2). + put("number_of_replicas", 0))); + ensureGreen(); + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index("test-index", "doc", Integer.toString(i)); + } + refresh("test-index"); + ensureSearchable("test-index"); + + // Happy case + RecoveryResponse recoveryResponse = dataNodeClient().admin().indices().prepareRecoveries().get(); + assertThat(recoveryResponse.getTotalShards(), equalTo(numShards)); + assertThat(recoveryResponse.getSuccessfulShards(), equalTo(numShards)); + + //simulate timeout on bad node. + simulateTimeoutAtTransport(dataNode, anotherDataNode, RecoveryAction.NAME); + + //verify response with bad node. + recoveryResponse = dataNodeClient().admin().indices().prepareRecoveries().get(); + assertThat(recoveryResponse.getTotalShards(), equalTo(numShards)); + assertThat(recoveryResponse.getSuccessfulShards(), equalTo(numShards/2)); + assertThat(recoveryResponse.getFailedShards(), equalTo(numShards/2)); + assertThat(recoveryResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException")); + } + + public void testStatsWithTimeout(){ + internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + String anotherDataNode = internalCluster().startDataOnlyNode(); + + int numShards = 4; + logger.info("--> creating index"); + assertAcked(prepareCreate("test-index", 0, Settings.builder(). + put("number_of_shards", numShards).put("routing.allocation.total_shards_per_node", 2). + put("number_of_replicas", 0))); + ensureGreen(); + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index("test-index", "doc", Integer.toString(i)); + } + refresh("test-index"); + ensureSearchable("test-index"); + + //happy case + IndicesStatsResponse indicesStats = dataNodeClient().admin().indices().prepareStats().setDocs(true).get(); + assertThat(indicesStats.getTotalShards(), equalTo(numShards)); + assertThat(indicesStats.getSuccessfulShards(), equalTo(numShards)); + + // simulate timeout on bad node. + simulateTimeoutAtTransport(dataNode, anotherDataNode, IndicesStatsAction.NAME); + + // verify indices state response with bad node. + indicesStats = dataNodeClient().admin().indices().prepareStats().setDocs(true).get(); + assertThat(indicesStats.getTotalShards(), equalTo(numShards)); + assertThat(indicesStats.getFailedShards(), equalTo(numShards/2)); + assertThat(indicesStats.getSuccessfulShards(), equalTo(numShards/2)); + assertThat(indicesStats.getTotal().getDocs().getCount(), lessThan(numDocs)); + assertThat(indicesStats.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException")); + } + + private void simulateTimeoutAtTransport(String dataNode, String anotherDataNode, String transportActionName) { + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, + dataNode)); + StubbableTransport.SendRequestBehavior sendBehaviour = (connection, requestId, action, request, options) -> { + if (action.startsWith(transportActionName)) { + throw new ReceiveTimeoutTransportException(connection.getNode(), action, "simulate timeout"); + } + connection.sendRequest(requestId, action, request, options); + }; + mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, anotherDataNode), sendBehaviour); + MockTransportService mockTransportServiceAnotherNode = ((MockTransportService) internalCluster().getInstance(TransportService.class, + anotherDataNode)); + mockTransportServiceAnotherNode.addSendBehavior(internalCluster().getInstance(TransportService.class, dataNode), sendBehaviour); + + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java index e84ef3e92b1..0f98b15ecc8 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java @@ -34,6 +34,7 @@ package org.opensearch.action.admin.indices.stats; import org.opensearch.action.support.broadcast.BroadcastOperationRequestBuilder; import org.opensearch.client.OpenSearchClient; +import org.opensearch.common.unit.TimeValue; /** * A request to get indices level stats. Allow to enable different stats to be returned. @@ -67,6 +68,14 @@ public class IndicesStatsRequestBuilder return this; } + /** + * Sets timeout of request. + */ + public final IndicesStatsRequestBuilder setTimeout(TimeValue timeout) { + request.timeout(timeout); + return this; + } + /** * Document types to return stats for. Mainly affects {@link #setIndexing(boolean)} when * enabled, returning specific indexing stats for those types. diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/BroadcastRequest.java b/server/src/main/java/org/opensearch/action/support/broadcast/BroadcastRequest.java index 0551eb7e65a..457f97acbe9 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/BroadcastRequest.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/BroadcastRequest.java @@ -38,6 +38,7 @@ import org.opensearch.action.IndicesRequest; import org.opensearch.action.support.IndicesOptions; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.unit.TimeValue; import java.io.IOException; @@ -66,6 +67,9 @@ public class BroadcastRequest> extends return indices; } + private final TimeValue DEFAULT_TIMEOUT_SECONDS = TimeValue.timeValueSeconds(30); + private TimeValue timeout; + @SuppressWarnings("unchecked") @Override public final Request indices(String... indices) { @@ -73,6 +77,22 @@ public class BroadcastRequest> extends return (Request) this; } + public TimeValue timeout() { + return this.timeout; + } + + @SuppressWarnings("unchecked") + public final Request timeout(TimeValue timeout) { + this.timeout = timeout; + return (Request) this; + } + + @SuppressWarnings("unchecked") + public final Request timeout(String timeout) { + this.timeout = TimeValue.parseTimeValue(timeout, DEFAULT_TIMEOUT_SECONDS, getClass().getSimpleName() + ".timeout"); + return (Request) this; + } + @Override public ActionRequestValidationException validate() { return null; diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index f2889e76cf3..c0d3a78fec4 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -66,6 +66,7 @@ import org.opensearch.transport.TransportRequestHandler; import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; +import org.opensearch.transport.TransportRequestOptions; import java.io.IOException; import java.util.ArrayList; @@ -334,7 +335,12 @@ public abstract class TransportBroadcastByNodeAction() { + TransportRequestOptions transportRequestOptions = TransportRequestOptions.EMPTY; + if (request != null && request.timeout() != null) { + transportRequestOptions = TransportRequestOptions.builder().withTimeout(request.timeout()).build(); + } + transportService.sendRequest( + node, transportNodeBroadcastAction, nodeRequest, transportRequestOptions, new TransportResponseHandler() { @Override public NodeResponse read(StreamInput in) throws IOException { return new NodeResponse(in); diff --git a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java index 7bc9d9dc58c..7e4ff6a6f25 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java @@ -61,6 +61,7 @@ public abstract class BaseNodesRequest * will be ignored and this will be used. * */ private DiscoveryNode[] concreteNodes; + private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30); private TimeValue timeout; @@ -102,7 +103,7 @@ public abstract class BaseNodesRequest @SuppressWarnings("unchecked") public final Request timeout(String timeout) { - this.timeout = TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout"); + this.timeout = TimeValue.parseTimeValue(timeout, DEFAULT_TIMEOUT_SECS, getClass().getSimpleName() + ".timeout"); return (Request) this; } public DiscoveryNode[] concreteNodes() { diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java index ec381036518..0a65256511b 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java @@ -78,6 +78,7 @@ public class RestNodesInfoAction extends BaseRestHandler { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { final NodesInfoRequest nodesInfoRequest = prepareRequest(request); + nodesInfoRequest.timeout(request.param("timeout")); settingsFilter.addFilterSettingParams(request); return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel)); diff --git a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsAction.java index 7d114bc2e6b..d8f18c9a37f 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsAction.java @@ -90,6 +90,7 @@ public class RestIndicesStatsAction extends BaseRestHandler { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.timeout(request.param("timeout")); boolean forbidClosedIndices = request.paramAsBoolean("forbid_closed_indices", true); IndicesOptions defaultIndicesOption = forbidClosedIndices ? indicesStatsRequest.indicesOptions() : IndicesOptions.strictExpandOpen(); diff --git a/server/src/main/java/org/opensearch/rest/action/cat/AbstractCatAction.java b/server/src/main/java/org/opensearch/rest/action/cat/AbstractCatAction.java index ed288eedd85..319e13b19b7 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/AbstractCatAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/AbstractCatAction.java @@ -85,7 +85,8 @@ public abstract class AbstractCatAction extends BaseRestHandler { } static Set RESPONSE_PARAMS = - Collections.unmodifiableSet(new HashSet<>(Arrays.asList("format", "h", "v", "ts", "pri", "bytes", "size", "time", "s"))); + Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + "format", "h", "v", "ts", "pri", "bytes", "size", "time", "s", "timeout"))); @Override protected Set responseParams() { diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java index 9bdcf29e8f0..1cfcd5ce413 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java @@ -88,6 +88,7 @@ public class RestAllocationAction extends AbstractCatAction { @Override public void processResponse(final ClusterStateResponse state) { NodesStatsRequest statsRequest = new NodesStatsRequest(nodes); + statsRequest.timeout(request.param("timeout")); statsRequest.clear().addMetric(NodesStatsRequest.Metric.FS.metricName()) .indices(new CommonStatsFlags(CommonStatsFlags.Flag.Store)); diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestCatRecoveryAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestCatRecoveryAction.java index 13d2be38877..606212baf6e 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestCatRecoveryAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestCatRecoveryAction.java @@ -84,6 +84,7 @@ public class RestCatRecoveryAction extends AbstractCatAction { @Override public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) { final RecoveryRequest recoveryRequest = new RecoveryRequest(Strings.splitStringByCommaToArray(request.param("index"))); + recoveryRequest.timeout(request.param("timeout")); recoveryRequest.detailed(request.paramAsBoolean("detailed", false)); recoveryRequest.activeOnly(request.paramAsBoolean("active_only", false)); recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions())); diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodeAttrsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodeAttrsAction.java index 0677ad9515f..bafc62c4b25 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodeAttrsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodeAttrsAction.java @@ -82,6 +82,7 @@ public class RestNodeAttrsAction extends AbstractCatAction { @Override public void processResponse(final ClusterStateResponse clusterStateResponse) { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.timeout(request.param("timeout")); nodesInfoRequest.clear() .addMetric(NodesInfoRequest.Metric.PROCESS.metricName()); client.admin().cluster().nodesInfo(nodesInfoRequest, new RestResponseListener(channel) { diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java index 9c1964d9aa5..a9e77c37184 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java @@ -116,6 +116,7 @@ public class RestNodesAction extends AbstractCatAction { @Override public void processResponse(final ClusterStateResponse clusterStateResponse) { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.timeout(request.param("timeout")); nodesInfoRequest.clear().addMetrics( NodesInfoRequest.Metric.JVM.metricName(), NodesInfoRequest.Metric.OS.metricName(), @@ -125,6 +126,7 @@ public class RestNodesAction extends AbstractCatAction { @Override public void processResponse(final NodesInfoResponse nodesInfoResponse) { NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); + nodesStatsRequest.timeout(request.param("timeout")); nodesStatsRequest.clear().indices(true).addMetrics( NodesStatsRequest.Metric.JVM.metricName(), NodesStatsRequest.Metric.OS.metricName(), diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestPluginsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestPluginsAction.java index bf354cbb269..89797074dbd 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestPluginsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestPluginsAction.java @@ -81,6 +81,7 @@ public class RestPluginsAction extends AbstractCatAction { @Override public void processResponse(final ClusterStateResponse clusterStateResponse) throws Exception { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.timeout(request.param("timeout")); nodesInfoRequest.clear() .addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); client.admin().cluster().nodesInfo(nodesInfoRequest, new RestResponseListener(channel) { diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java index 993d5b4695e..44f737b4bd9 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java @@ -97,6 +97,7 @@ public class RestThreadPoolAction extends AbstractCatAction { @Override public void processResponse(final ClusterStateResponse clusterStateResponse) { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.timeout(request.param("timeout")); nodesInfoRequest.clear().addMetrics( NodesInfoRequest.Metric.PROCESS.metricName(), NodesInfoRequest.Metric.THREAD_POOL.metricName()); @@ -104,6 +105,7 @@ public class RestThreadPoolAction extends AbstractCatAction { @Override public void processResponse(final NodesInfoResponse nodesInfoResponse) { NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); + nodesStatsRequest.timeout(request.param("timeout")); nodesStatsRequest.clear().addMetric(NodesStatsRequest.Metric.THREAD_POOL.metricName()); client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener(channel) { @Override diff --git a/server/src/test/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index 38eba2b3338..941c801ccf1 100644 --- a/server/src/test/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -71,6 +71,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.ReceiveTimeoutTransportException; import org.opensearch.transport.TestTransportChannel; import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; @@ -488,4 +489,60 @@ public class TransportBroadcastByNodeActionTests extends OpenSearchTestCase { assertEquals("failed shards", totalFailedShards, response.getFailedShards()); assertEquals("accumulated exceptions", totalFailedShards, response.getShardFailures().length); } + + public void testResultWithTimeouts() throws ExecutionException, InterruptedException { + Request request = new Request(new String[]{TEST_INDEX}); + PlainActionFuture listener = new PlainActionFuture<>(); + action.new AsyncAction(null, request, listener).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + + ShardsIterator shardIt = clusterService.state().getRoutingTable().allShards(new String[]{TEST_INDEX}); + Map> map = new HashMap<>(); + for (ShardRouting shard : shardIt) { + if (!map.containsKey(shard.currentNodeId())) { + map.put(shard.currentNodeId(), new ArrayList<>()); + } + map.get(shard.currentNodeId()).add(shard); + } + + int totalShards = 0; + int totalSuccessfulShards = 0; + int totalFailedShards = 0; + String failedNodeId = "node_" + randomIntBetween(0, capturedRequests.size()); + for (Map.Entry> entry : capturedRequests.entrySet()) { + List exceptions = new ArrayList<>(); + long requestId = entry.getValue().get(0).requestId; + if (entry.getKey().equals(failedNodeId)) { + // simulate node timeout + totalShards += map.get(entry.getKey()).size(); + totalFailedShards += map.get(entry.getKey()).size(); + transport.handleError(requestId, + new ReceiveTimeoutTransportException(clusterService.state().getRoutingNodes().node(entry.getKey()).node(), + "indices:admin/test" , "time_out_simulated")); + } else { + List shards = map.get(entry.getKey()); + List shardResults = new ArrayList<>(); + for (ShardRouting shard : shards) { + totalShards++; + if (rarely()) { + // simulate operation failure + totalFailedShards++; + exceptions.add(new BroadcastShardOperationFailedException(shard.shardId(), "operation indices:admin/test failed")); + } else { + shardResults.add(TransportBroadcastByNodeAction.EmptyResult.INSTANCE); + } + } + totalSuccessfulShards += shardResults.size(); + TransportBroadcastByNodeAction.NodeResponse nodeResponse = action.new NodeResponse(entry.getKey(), shards.size(), + shardResults, exceptions); + transport.handleResponse(requestId, nodeResponse); + } + } + + Response response = listener.get(); + assertEquals("total shards", totalShards, response.getTotalShards()); + assertEquals("successful shards", totalSuccessfulShards, response.getSuccessfulShards()); + assertEquals("failed shards", totalFailedShards, response.getFailedShards()); + assertEquals("accumulated exceptions", totalFailedShards, response.getShardFailures().length); + } }