don't through fail to send exception on a the generic thread - it may be shut down already and will an exception

This commit is contained in:
Boaz Leskes 2016-02-14 21:13:20 +01:00
parent 335379046a
commit 76465ec37a
3 changed files with 45 additions and 46 deletions

View File

@ -52,7 +52,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException;
@ -68,6 +67,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportChannelResponseHandler;
import org.elasticsearch.transport.TransportException;
@ -1018,38 +1018,36 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node);
logger.warn("[{}] {}", exp, shardId, message);
shardStateAction.shardFailed(
shard,
indexShardReference.routingEntry(),
message,
exp,
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onReplicaFailure(nodeId, exp);
}
@Override
public void onFailure(Throwable shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
String message = "unknown";
try {
ShardRouting primaryShard = indexShardReference.routingEntry();
message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp);
// we are no longer the primary, fail ourselves and start over
indexShardReference.failShard(message, shardFailedError);
} catch (Throwable t) {
shardFailedError.addSuppressed(t);
}
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError));
} else {
assert (shardFailedError instanceof EsRejectedExecutionException) ||
(shardFailedError.getMessage() != null &&
shardFailedError.getMessage().contains("TransportService is closed")) :
shardFailedError;
shard,
indexShardReference.routingEntry(),
message,
exp,
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onReplicaFailure(nodeId, exp);
}
@Override
public void onFailure(Throwable shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
String message = "unknown";
try {
ShardRouting primaryShard = indexShardReference.routingEntry();
message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp);
// we are no longer the primary, fail ourselves and start over
indexShardReference.failShard(message, shardFailedError);
} catch (Throwable t) {
shardFailedError.addSuppressed(t);
}
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError));
} else {
assert shardFailedError instanceof SendRequestTransportException ||
shardFailedError instanceof NodeClosedException : shardFailedError;
onReplicaFailure(nodeId, exp);
}
}
}
}
);
}
}
@ -1133,7 +1131,9 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
interface IndexShardReference extends Releasable {
boolean isRelocated();
void failShard(String reason, @Nullable Throwable e);
ShardRouting routingEntry();
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingService;
@ -54,6 +53,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@ -112,7 +112,7 @@ public class ShardStateAction extends AbstractComponent {
waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
} else {
logger.warn("{} unexpected failure while sending request [{}] to [{}] for shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), actionName, masterNode, shardRoutingEntry);
listener.onFailure(exp.getCause());
listener.onFailure(exp instanceof RemoteTransportException ? exp.getCause() : exp);
}
}
});

View File

@ -167,6 +167,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
void setTracerLogExclude(List<String> tracelLogExclude) {
this.tracelLogExclude = tracelLogExclude.toArray(Strings.EMPTY_ARRAY);
}
@Override
protected void doStart() {
adapter.rxMetric.clear();
@ -326,12 +327,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
holderToNotify.handler().handleException(sendRequestException);
}
});
holderToNotify.handler().handleException(sendRequestException);
}
}
}
@ -405,10 +401,11 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
/**
* Registers a new request handler
* @param action The action the request handler is associated with
*
* @param action The action the request handler is associated with
* @param requestFactory a callable to be used construct new instances for streaming
* @param executor The executor the request handling will be executed on
* @param handler The handler itself that implements the request handling
* @param executor The executor the request handling will be executed on
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory, String executor, TransportRequestHandler<Request> handler) {
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, requestFactory, taskManager, handler, executor, false);
@ -417,11 +414,12 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
/**
* Registers a new request handler
* @param action The action the request handler is associated with
* @param request The request class that will be used to constrcut new instances for streaming
* @param executor The executor the request handling will be executed on
*
* @param action The action the request handler is associated with
* @param request The request class that will be used to constrcut new instances for streaming
* @param executor The executor the request handling will be executed on
* @param forceExecution Force execution on the executor queue and never reject it
* @param handler The handler itself that implements the request handling
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request, String executor, boolean forceExecution, TransportRequestHandler<Request> handler) {
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, request, taskManager, handler, executor, forceExecution);
@ -729,6 +727,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
private final static class ContextRestoreResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {
private final TransportResponseHandler<T> delegate;
private final ThreadContext.StoredContext threadContext;
private ContextRestoreResponseHandler(ThreadContext.StoredContext threadContext, TransportResponseHandler<T> delegate) {
this.delegate = delegate;
this.threadContext = threadContext;
@ -766,7 +765,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
final ThreadPool threadPool;
public DirectResponseChannel(ESLogger logger, DiscoveryNode localNode, String action, long requestId,
TransportServiceAdapter adapter, ThreadPool threadPool) {
TransportServiceAdapter adapter, ThreadPool threadPool) {
this.logger = logger;
this.localNode = localNode;
this.action = action;