send response for update request when it timed out
This commit is contained in:
parent
8a590d46b8
commit
13d3a3e257
|
@ -21,6 +21,7 @@ package org.elasticsearch.action;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.HppcMaps;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
@ -32,8 +33,8 @@ import java.io.IOException;
|
|||
*/
|
||||
public class UnavailableShardsException extends ElasticsearchException {
|
||||
|
||||
public UnavailableShardsException(@Nullable ShardId shardId, String message) {
|
||||
super(buildMessage(shardId, message));
|
||||
public UnavailableShardsException(@Nullable ShardId shardId, String message, Object... args) {
|
||||
super(buildMessage(shardId, message), args);
|
||||
}
|
||||
|
||||
private static String buildMessage(ShardId shardId, String message) {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.support.single.instance;
|
||||
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.UnavailableShardsException;
|
||||
|
@ -35,6 +36,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -42,6 +44,7 @@ import org.elasticsearch.node.NodeClosedException;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.*;
|
||||
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -111,9 +114,8 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|||
private volatile ClusterStateObserver observer;
|
||||
private ShardIterator shardIt;
|
||||
private DiscoveryNodes nodes;
|
||||
private final AtomicBoolean operationStarted = new AtomicBoolean();
|
||||
|
||||
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
|
||||
AsyncSingleAction(Request request, ActionListener<Response> listener) {
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
@ -123,14 +125,14 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|||
doStart();
|
||||
}
|
||||
|
||||
protected boolean doStart() {
|
||||
protected void doStart() {
|
||||
nodes = observer.observedState().nodes();
|
||||
try {
|
||||
ClusterBlockException blockException = checkGlobalBlock(observer.observedState());
|
||||
if (blockException != null) {
|
||||
if (blockException.retryable()) {
|
||||
retry(blockException);
|
||||
return false;
|
||||
return;
|
||||
} else {
|
||||
throw blockException;
|
||||
}
|
||||
|
@ -138,13 +140,14 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|||
request.concreteIndex(indexNameExpressionResolver.concreteSingleIndex(observer.observedState(), request));
|
||||
// check if we need to execute, and if not, return
|
||||
if (!resolveRequest(observer.observedState(), request, listener)) {
|
||||
return true;
|
||||
listener.onFailure(new IllegalStateException(LoggerMessageFormat.format("{} request {} could not be resolved", new ShardId(request.index, request.shardId), actionName)));
|
||||
return;
|
||||
}
|
||||
blockException = checkRequestBlock(observer.observedState(), request);
|
||||
if (blockException != null) {
|
||||
if (blockException.retryable()) {
|
||||
retry(blockException);
|
||||
return false;
|
||||
return;
|
||||
} else {
|
||||
throw blockException;
|
||||
}
|
||||
|
@ -152,13 +155,13 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|||
shardIt = shards(observer.observedState(), request);
|
||||
} catch (Throwable e) {
|
||||
listener.onFailure(e);
|
||||
return true;
|
||||
return;
|
||||
}
|
||||
|
||||
// no shardIt, might be in the case between index gateway recovery and shardIt initialization
|
||||
if (shardIt.size() == 0) {
|
||||
retry(null);
|
||||
return false;
|
||||
return;
|
||||
}
|
||||
|
||||
// this transport only make sense with an iterator that returns a single shard routing (like primary)
|
||||
|
@ -169,11 +172,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|||
|
||||
if (!shard.active()) {
|
||||
retry(null);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!operationStarted.compareAndSet(false, true)) {
|
||||
return true;
|
||||
return;
|
||||
}
|
||||
|
||||
request.shardId = shardIt.shardId().id();
|
||||
|
@ -197,24 +196,30 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
Throwable cause = exp.unwrapCause();
|
||||
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
|
||||
if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
|
||||
if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException ||
|
||||
retryOnFailure(exp)) {
|
||||
operationStarted.set(false);
|
||||
// we already marked it as started when we executed it (removed the listener) so pass false
|
||||
// to re-add to the cluster listener
|
||||
retry(null);
|
||||
retry(cause);
|
||||
} else {
|
||||
listener.onFailure(exp);
|
||||
}
|
||||
}
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
void retry(final @Nullable Throwable failure) {
|
||||
if (observer.isTimedOut()) {
|
||||
// we running as a last attempt after a timeout has happened. don't retry
|
||||
Throwable listenFailure = failure;
|
||||
if (listenFailure == null) {
|
||||
if (shardIt == null) {
|
||||
listenFailure = new UnavailableShardsException(new ShardId(request.concreteIndex(), -1), "Timeout waiting for [{}], request: {}", request.timeout(), actionName);
|
||||
} else {
|
||||
listenFailure = new UnavailableShardsException(shardIt.shardId(), "[{}] shardIt, [{}] active : Timeout waiting for [{}], request: {}", shardIt.size(), shardIt.sizeActive(), request.timeout(), actionName);
|
||||
}
|
||||
}
|
||||
listener.onFailure(listenFailure);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -232,17 +237,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
// just to be on the safe side, see if we can start it now?
|
||||
if (!doStart()) {
|
||||
Throwable listenFailure = failure;
|
||||
if (listenFailure == null) {
|
||||
if (shardIt == null) {
|
||||
listenFailure = new UnavailableShardsException(new ShardId(request.concreteIndex(), -1), "Timeout waiting for [" + timeout + "], request: " + request.toString());
|
||||
} else {
|
||||
listenFailure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeout + "], request: " + request.toString());
|
||||
}
|
||||
}
|
||||
listener.onFailure(listenFailure);
|
||||
}
|
||||
doStart();
|
||||
}
|
||||
}, request.timeout());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,316 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.single.instance;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.support.ActionFilter;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.cluster.TestClusterService;
|
||||
import org.elasticsearch.test.transport.CapturingTransport;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.hamcrest.core.IsEqual.equalTo;
|
||||
|
||||
public class TransportInstanceSingleOperationActionTests extends ESTestCase {
|
||||
|
||||
private static ThreadPool THREAD_POOL;
|
||||
|
||||
private TestClusterService clusterService;
|
||||
private CapturingTransport transport;
|
||||
private TransportService transportService;
|
||||
|
||||
private TestTransportInstanceSingleOperationAction action;
|
||||
|
||||
public static class Request extends InstanceShardOperationRequest<Request> {
|
||||
public Request() {
|
||||
}
|
||||
}
|
||||
|
||||
public static class Response extends ActionResponse {
|
||||
public Response() {
|
||||
}
|
||||
}
|
||||
|
||||
class TestTransportInstanceSingleOperationAction extends TransportInstanceSingleOperationAction<Request, Response> {
|
||||
private final Map<ShardId, Object> shards = new HashMap<>();
|
||||
|
||||
public TestTransportInstanceSingleOperationAction(Settings settings, String actionName, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
|
||||
super(settings, actionName, THREAD_POOL, TransportInstanceSingleOperationActionTests.this.clusterService, transportService, actionFilters, indexNameExpressionResolver, request);
|
||||
}
|
||||
|
||||
public Map<ShardId, Object> getResults() {
|
||||
return shards;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void shardOperation(Request request, ActionListener<Response> listener) {
|
||||
throw new UnsupportedOperationException("Not implemented in test class");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean resolveRequest(ClusterState state, Request request, ActionListener<Response> listener) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ShardIterator shards(ClusterState clusterState, Request request) {
|
||||
return clusterState.routingTable().index(request.concreteIndex()).shard(request.shardId).primaryShardIt();
|
||||
}
|
||||
}
|
||||
|
||||
class MyResolver extends IndexNameExpressionResolver {
|
||||
public MyResolver() {
|
||||
super(Settings.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] concreteIndices(ClusterState state, IndicesRequest request) {
|
||||
return request.indices();
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void startThreadPool() {
|
||||
THREAD_POOL = new ThreadPool(TransportInstanceSingleOperationActionTests.class.getSimpleName());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
transport = new CapturingTransport();
|
||||
clusterService = new TestClusterService(THREAD_POOL);
|
||||
transportService = new TransportService(transport, THREAD_POOL);
|
||||
transportService.start();
|
||||
action = new TestTransportInstanceSingleOperationAction(
|
||||
Settings.EMPTY,
|
||||
"indices:admin/test",
|
||||
transportService,
|
||||
new ActionFilters(new HashSet<ActionFilter>()),
|
||||
new MyResolver(),
|
||||
Request::new
|
||||
);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void destroyThreadPool() {
|
||||
ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS);
|
||||
// since static must set to null to be eligible for collection
|
||||
THREAD_POOL = null;
|
||||
}
|
||||
|
||||
public void testGlobalBlock() {
|
||||
Request request = new Request();
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
ClusterBlocks.Builder block = ClusterBlocks.builder()
|
||||
.addGlobalBlock(new ClusterBlock(1, "", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
|
||||
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
|
||||
try {
|
||||
action.new AsyncSingleAction(request, listener).start();
|
||||
listener.get();
|
||||
fail("expected ClusterBlockException");
|
||||
} catch (Throwable t) {
|
||||
if (ExceptionsHelper.unwrap(t, ClusterBlockException.class) == null) {
|
||||
logger.info("expected ClusterBlockException but got ", t);
|
||||
fail("expected ClusterBlockException");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testBasicRequestWorks() throws InterruptedException, ExecutionException, TimeoutException {
|
||||
Request request = new Request().index("test");
|
||||
request.shardId = 0;
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
clusterService.setState(ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED));
|
||||
action.new AsyncSingleAction(request, listener).start();
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
transport.handleResponse(transport.capturedRequests()[0].requestId, new Response());
|
||||
listener.get();
|
||||
}
|
||||
|
||||
public void testFailureWithoutRetry() throws Exception {
|
||||
Request request = new Request().index("test");
|
||||
request.shardId = 0;
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
clusterService.setState(ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED));
|
||||
|
||||
action.new AsyncSingleAction(request, listener).start();
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
long requestId = transport.capturedRequests()[0].requestId;
|
||||
transport.clear();
|
||||
// this should not trigger retry or anything and the listener should report exception immediately
|
||||
transport.handleResponse(requestId, new TransportException("a generic transport exception", new Exception("generic test exception")));
|
||||
|
||||
try {
|
||||
// result should return immediately
|
||||
assertTrue(listener.isDone());
|
||||
listener.get();
|
||||
fail("this should fail with a transport exception");
|
||||
} catch (ExecutionException t) {
|
||||
if (ExceptionsHelper.unwrap(t, TransportException.class) == null) {
|
||||
logger.info("expected TransportException but got ", t);
|
||||
fail("expected and TransportException");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testSuccessAfterRetryWithClusterStateUpdate() throws Exception {
|
||||
Request request = new Request().index("test");
|
||||
request.shardId = 0;
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
boolean local = randomBoolean();
|
||||
clusterService.setState(ClusterStateCreationUtils.state("test", local, ShardRoutingState.INITIALIZING));
|
||||
action.new AsyncSingleAction(request, listener).start();
|
||||
// this should fail because primary not initialized
|
||||
assertThat(transport.capturedRequests().length, equalTo(0));
|
||||
clusterService.setState(ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED));
|
||||
// this time it should work
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
transport.handleResponse(transport.capturedRequests()[0].requestId, new Response());
|
||||
listener.get();
|
||||
}
|
||||
|
||||
public void testSuccessAfterRetryWithExcpetionFromTransport() throws Exception {
|
||||
Request request = new Request().index("test");
|
||||
request.shardId = 0;
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
boolean local = randomBoolean();
|
||||
clusterService.setState(ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED));
|
||||
action.new AsyncSingleAction(request, listener).start();
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
long requestId = transport.capturedRequests()[0].requestId;
|
||||
transport.clear();
|
||||
DiscoveryNode node = clusterService.state().getNodes().getLocalNode();
|
||||
transport.handleResponse(requestId, new ConnectTransportException(node, "test exception"));
|
||||
// trigger cluster state observer
|
||||
clusterService.setState(ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED));
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
transport.handleResponse(transport.capturedRequests()[0].requestId, new Response());
|
||||
listener.get();
|
||||
}
|
||||
|
||||
public void testRetryOfAnAlreadyTimedOutRequest() throws Exception {
|
||||
Request request = new Request().index("test").timeout(new TimeValue(0, TimeUnit.MILLISECONDS));
|
||||
request.shardId = 0;
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
clusterService.setState(ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED));
|
||||
action.new AsyncSingleAction(request, listener).start();
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
long requestId = transport.capturedRequests()[0].requestId;
|
||||
transport.clear();
|
||||
DiscoveryNode node = clusterService.state().getNodes().getLocalNode();
|
||||
transport.handleResponse(requestId, new ConnectTransportException(node, "test exception"));
|
||||
|
||||
// wait until the timeout was triggered and we actually tried to send for the second time
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
}
|
||||
});
|
||||
|
||||
// let it fail the second time too
|
||||
requestId = transport.capturedRequests()[0].requestId;
|
||||
transport.handleResponse(requestId, new ConnectTransportException(node, "test exception"));
|
||||
try {
|
||||
// result should return immediately
|
||||
assertTrue(listener.isDone());
|
||||
listener.get();
|
||||
fail("this should fail with a transport exception");
|
||||
} catch (ExecutionException t) {
|
||||
if (ExceptionsHelper.unwrap(t, ConnectTransportException.class) == null) {
|
||||
logger.info("expected ConnectTransportException but got ", t);
|
||||
fail("expected and ConnectTransportException");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testUnresolvableRequestDoesNotHang() throws InterruptedException, ExecutionException, TimeoutException {
|
||||
action = new TestTransportInstanceSingleOperationAction(
|
||||
Settings.EMPTY,
|
||||
"indices:admin/test_unresolvable",
|
||||
transportService,
|
||||
new ActionFilters(new HashSet<ActionFilter>()),
|
||||
new MyResolver(),
|
||||
Request::new
|
||||
) {
|
||||
@Override
|
||||
protected boolean resolveRequest(ClusterState state, Request request, ActionListener<Response> listener) {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
Request request = new Request().index("test");
|
||||
request.shardId = 0;
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
clusterService.setState(ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED));
|
||||
action.new AsyncSingleAction(request, listener).start();
|
||||
assertThat(transport.capturedRequests().length, equalTo(0));
|
||||
try {
|
||||
listener.get();
|
||||
} catch (Throwable t) {
|
||||
if (ExceptionsHelper.unwrap(t, IllegalStateException.class) == null) {
|
||||
logger.info("expected IllegalStateException but got ", t);
|
||||
fail("expected and IllegalStateException");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue