From 13d3a3e2572fb3ea0b2c25df30b98647eff7e5ad Mon Sep 17 00:00:00 2001 From: Britta Weber Date: Tue, 25 Aug 2015 12:36:32 +0200 Subject: [PATCH] send response for update request when it timed out --- .../action/UnavailableShardsException.java | 5 +- ...ransportInstanceSingleOperationAction.java | 55 ++- ...ortInstanceSingleOperationActionTests.java | 316 ++++++++++++++++++ 3 files changed, 344 insertions(+), 32 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java diff --git a/core/src/main/java/org/elasticsearch/action/UnavailableShardsException.java b/core/src/main/java/org/elasticsearch/action/UnavailableShardsException.java index dd0968ee4ea..ff31bb715db 100644 --- a/core/src/main/java/org/elasticsearch/action/UnavailableShardsException.java +++ b/core/src/main/java/org/elasticsearch/action/UnavailableShardsException.java @@ -21,6 +21,7 @@ package org.elasticsearch.action; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.HppcMaps; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; @@ -32,8 +33,8 @@ import java.io.IOException; */ public class UnavailableShardsException extends ElasticsearchException { - public UnavailableShardsException(@Nullable ShardId shardId, String message) { - super(buildMessage(shardId, message)); + public UnavailableShardsException(@Nullable ShardId shardId, String message, Object... args) { + super(buildMessage(shardId, message), args); } private static String buildMessage(ShardId shardId, String message) { diff --git a/core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java index 2e815da3835..5f4f9420295 100644 --- a/core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.support.single.instance; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.UnavailableShardsException; @@ -35,6 +36,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.logging.support.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; @@ -42,6 +44,7 @@ import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -111,9 +114,8 @@ public abstract class TransportInstanceSingleOperationAction listener) { + AsyncSingleAction(Request request, ActionListener listener) { this.request = request; this.listener = listener; } @@ -123,14 +125,14 @@ public abstract class TransportInstanceSingleOperationAction { + public Request() { + } + } + + public static class Response extends ActionResponse { + public Response() { + } + } + + class TestTransportInstanceSingleOperationAction extends TransportInstanceSingleOperationAction { + private final Map shards = new HashMap<>(); + + public TestTransportInstanceSingleOperationAction(Settings settings, String actionName, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { + super(settings, actionName, THREAD_POOL, TransportInstanceSingleOperationActionTests.this.clusterService, transportService, actionFilters, indexNameExpressionResolver, request); + } + + public Map getResults() { + return shards; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected void shardOperation(Request request, ActionListener listener) { + throw new UnsupportedOperationException("Not implemented in test class"); + } + + @Override + protected Response newResponse() { + return new Response(); + } + + @Override + protected boolean resolveRequest(ClusterState state, Request request, ActionListener listener) { + return true; + } + + @Override + protected ShardIterator shards(ClusterState clusterState, Request request) { + return clusterState.routingTable().index(request.concreteIndex()).shard(request.shardId).primaryShardIt(); + } + } + + class MyResolver extends IndexNameExpressionResolver { + public MyResolver() { + super(Settings.EMPTY); + } + + @Override + public String[] concreteIndices(ClusterState state, IndicesRequest request) { + return request.indices(); + } + } + + @BeforeClass + public static void startThreadPool() { + THREAD_POOL = new ThreadPool(TransportInstanceSingleOperationActionTests.class.getSimpleName()); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + transport = new CapturingTransport(); + clusterService = new TestClusterService(THREAD_POOL); + transportService = new TransportService(transport, THREAD_POOL); + transportService.start(); + action = new TestTransportInstanceSingleOperationAction( + Settings.EMPTY, + "indices:admin/test", + transportService, + new ActionFilters(new HashSet()), + new MyResolver(), + Request::new + ); + } + + @AfterClass + public static void destroyThreadPool() { + ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS); + // since static must set to null to be eligible for collection + THREAD_POOL = null; + } + + public void testGlobalBlock() { + Request request = new Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + ClusterBlocks.Builder block = ClusterBlocks.builder() + .addGlobalBlock(new ClusterBlock(1, "", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); + try { + action.new AsyncSingleAction(request, listener).start(); + listener.get(); + fail("expected ClusterBlockException"); + } catch (Throwable t) { + if (ExceptionsHelper.unwrap(t, ClusterBlockException.class) == null) { + logger.info("expected ClusterBlockException but got ", t); + fail("expected ClusterBlockException"); + } + } + } + + public void testBasicRequestWorks() throws InterruptedException, ExecutionException, TimeoutException { + Request request = new Request().index("test"); + request.shardId = 0; + PlainActionFuture listener = new PlainActionFuture<>(); + clusterService.setState(ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED)); + action.new AsyncSingleAction(request, listener).start(); + assertThat(transport.capturedRequests().length, equalTo(1)); + transport.handleResponse(transport.capturedRequests()[0].requestId, new Response()); + listener.get(); + } + + public void testFailureWithoutRetry() throws Exception { + Request request = new Request().index("test"); + request.shardId = 0; + PlainActionFuture listener = new PlainActionFuture<>(); + clusterService.setState(ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED)); + + action.new AsyncSingleAction(request, listener).start(); + assertThat(transport.capturedRequests().length, equalTo(1)); + long requestId = transport.capturedRequests()[0].requestId; + transport.clear(); + // this should not trigger retry or anything and the listener should report exception immediately + transport.handleResponse(requestId, new TransportException("a generic transport exception", new Exception("generic test exception"))); + + try { + // result should return immediately + assertTrue(listener.isDone()); + listener.get(); + fail("this should fail with a transport exception"); + } catch (ExecutionException t) { + if (ExceptionsHelper.unwrap(t, TransportException.class) == null) { + logger.info("expected TransportException but got ", t); + fail("expected and TransportException"); + } + } + } + + public void testSuccessAfterRetryWithClusterStateUpdate() throws Exception { + Request request = new Request().index("test"); + request.shardId = 0; + PlainActionFuture listener = new PlainActionFuture<>(); + boolean local = randomBoolean(); + clusterService.setState(ClusterStateCreationUtils.state("test", local, ShardRoutingState.INITIALIZING)); + action.new AsyncSingleAction(request, listener).start(); + // this should fail because primary not initialized + assertThat(transport.capturedRequests().length, equalTo(0)); + clusterService.setState(ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED)); + // this time it should work + assertThat(transport.capturedRequests().length, equalTo(1)); + transport.handleResponse(transport.capturedRequests()[0].requestId, new Response()); + listener.get(); + } + + public void testSuccessAfterRetryWithExcpetionFromTransport() throws Exception { + Request request = new Request().index("test"); + request.shardId = 0; + PlainActionFuture listener = new PlainActionFuture<>(); + boolean local = randomBoolean(); + clusterService.setState(ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED)); + action.new AsyncSingleAction(request, listener).start(); + assertThat(transport.capturedRequests().length, equalTo(1)); + long requestId = transport.capturedRequests()[0].requestId; + transport.clear(); + DiscoveryNode node = clusterService.state().getNodes().getLocalNode(); + transport.handleResponse(requestId, new ConnectTransportException(node, "test exception")); + // trigger cluster state observer + clusterService.setState(ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED)); + assertThat(transport.capturedRequests().length, equalTo(1)); + transport.handleResponse(transport.capturedRequests()[0].requestId, new Response()); + listener.get(); + } + + public void testRetryOfAnAlreadyTimedOutRequest() throws Exception { + Request request = new Request().index("test").timeout(new TimeValue(0, TimeUnit.MILLISECONDS)); + request.shardId = 0; + PlainActionFuture listener = new PlainActionFuture<>(); + clusterService.setState(ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED)); + action.new AsyncSingleAction(request, listener).start(); + assertThat(transport.capturedRequests().length, equalTo(1)); + long requestId = transport.capturedRequests()[0].requestId; + transport.clear(); + DiscoveryNode node = clusterService.state().getNodes().getLocalNode(); + transport.handleResponse(requestId, new ConnectTransportException(node, "test exception")); + + // wait until the timeout was triggered and we actually tried to send for the second time + assertBusy(new Runnable() { + @Override + public void run() { + assertThat(transport.capturedRequests().length, equalTo(1)); + } + }); + + // let it fail the second time too + requestId = transport.capturedRequests()[0].requestId; + transport.handleResponse(requestId, new ConnectTransportException(node, "test exception")); + try { + // result should return immediately + assertTrue(listener.isDone()); + listener.get(); + fail("this should fail with a transport exception"); + } catch (ExecutionException t) { + if (ExceptionsHelper.unwrap(t, ConnectTransportException.class) == null) { + logger.info("expected ConnectTransportException but got ", t); + fail("expected and ConnectTransportException"); + } + } + } + + public void testUnresolvableRequestDoesNotHang() throws InterruptedException, ExecutionException, TimeoutException { + action = new TestTransportInstanceSingleOperationAction( + Settings.EMPTY, + "indices:admin/test_unresolvable", + transportService, + new ActionFilters(new HashSet()), + new MyResolver(), + Request::new + ) { + @Override + protected boolean resolveRequest(ClusterState state, Request request, ActionListener listener) { + return false; + } + }; + Request request = new Request().index("test"); + request.shardId = 0; + PlainActionFuture listener = new PlainActionFuture<>(); + clusterService.setState(ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED)); + action.new AsyncSingleAction(request, listener).start(); + assertThat(transport.capturedRequests().length, equalTo(0)); + try { + listener.get(); + } catch (Throwable t) { + if (ExceptionsHelper.unwrap(t, IllegalStateException.class) == null) { + logger.info("expected IllegalStateException but got ", t); + fail("expected and IllegalStateException"); + } + } + } +} \ No newline at end of file