Search on a shard group while relocation final flip happens might fail

single shard read operations should have the same override exception logic as search and broadcast

relates to #3427
This commit is contained in:
Shay Banon 2013-08-02 09:55:22 +02:00
parent 343871fcf5
commit a8dcfa5deb
6 changed files with 85 additions and 110 deletions

View File

@ -20,11 +20,11 @@
package org.elasticsearch.action.search.type; package org.elasticsearch.action.search.type;
import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.search.*; import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -37,9 +37,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.ExtTIntArrayList; import org.elasticsearch.common.trove.ExtTIntArrayList;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceListener;
@ -337,29 +334,12 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
} else { } else {
// the failure is already present, try and not override it with an exception that is less meaningless // the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state // for example, getting illegal shard state
if (isOverrideException(t)) { if (TransportActions.isReadOverrideException(t)) {
shardFailures.set(shardIndex, new ShardSearchFailure(t)); shardFailures.set(shardIndex, new ShardSearchFailure(t));
} }
} }
} }
protected boolean isOverrideException(Throwable t) {
Throwable actual = ExceptionsHelper.unwrapCause(t);
if (actual instanceof IllegalIndexShardStateException) {
return false;
}
if (actual instanceof IndexMissingException) {
return false;
}
if (actual instanceof IndexShardMissingException) {
return false;
}
if (actual instanceof NoShardAvailableActionException) {
return false;
}
return false;
}
/** /**
* Releases shard targets that are not used in the docsIdsToLoad. * Releases shard targets that are not used in the docsIdsToLoad.
*/ */

View File

@ -0,0 +1,58 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.indices.IndexMissingException;
/**
*/
public class TransportActions {
public static boolean isShardNotAvailableException(Throwable t) {
Throwable actual = ExceptionsHelper.unwrapCause(t);
if (actual instanceof IllegalIndexShardStateException) {
return true;
}
if (actual instanceof IndexMissingException) {
return true;
}
if (actual instanceof IndexShardMissingException) {
return true;
}
if (actual instanceof NoShardAvailableActionException) {
return true;
}
return false;
}
/**
* If a failure is already present, should this failure override it or not for read operations.
*/
public static boolean isReadOverrideException(Throwable t) {
if (isShardNotAvailableException(t)) {
return false;
}
return true;
}
}

View File

@ -20,10 +20,10 @@
package org.elasticsearch.action.support.broadcast; package org.elasticsearch.action.support.broadcast;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
@ -34,9 +34,6 @@ import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
@ -101,40 +98,12 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
} }
/** /**
* Override this method to ignore specific exception, note, the result should be OR'ed with the call * Should non active routing shard state be ignore or not.
* to super#ignoreException since there is additional logic here....
*/
protected boolean ignoreException(Throwable t) {
if (ignoreIllegalShardState()) {
Throwable actual = ExceptionsHelper.unwrapCause(t);
if (actual instanceof IllegalIndexShardStateException) {
return true;
}
if (actual instanceof IndexMissingException) {
return true;
}
if (actual instanceof IndexShardMissingException) {
return true;
}
}
return false;
}
/**
* Should non active routing shard state be ignore or node, defaults to false.
*/ */
protected boolean ignoreNonActiveExceptions() { protected boolean ignoreNonActiveExceptions() {
return false; return false;
} }
/**
* Should the API ignore illegal shard state cases, for example, if the shard is actually missing on the
* target node (cause it hasn't been allocated there for example). Defaults to true.
*/
protected boolean ignoreIllegalShardState() {
return true;
}
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request); protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices); protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices);
@ -321,9 +290,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
ShardRouting nextShard = shardIt.nextOrNull(); ShardRouting nextShard = shardIt.nextOrNull();
if (nextShard != null) { if (nextShard != null) {
if (t != null) { if (t != null) {
// trace log this exception
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
if (!ignoreException(t)) { if (!TransportActions.isShardNotAvailableException(t)) {
if (shard != null) { if (shard != null) {
logger.trace(shard.shortSummary() + ": Failed to execute [" + request + "]", t); logger.trace(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
} else { } else {
@ -338,10 +306,9 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
// so there is no meaning to this flag // so there is no meaning to this flag
performOperation(shardIt, nextShard, shardIndex, true); performOperation(shardIt, nextShard, shardIndex, true);
} else { } else {
// e is null when there is no next active....
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
if (t != null) { if (t != null) {
if (!ignoreException(t)) { if (!TransportActions.isShardNotAvailableException(t)) {
if (shard != null) { if (shard != null) {
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t); logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
} else { } else {
@ -364,10 +331,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
if (!accumulateExceptions()) { if (!accumulateExceptions()) {
return; return;
} }
if (ignoreNonActiveExceptions() && t instanceof NoShardAvailableActionException) {
return; if (ignoreNonActiveExceptions() && TransportActions.isShardNotAvailableException(t)) {
}
if (ignoreException(t)) {
return; return;
} }
@ -388,29 +353,12 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
// the failure is already present, try and not override it with an exception that is less meaningless // the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state // for example, getting illegal shard state
if (isOverrideException(t)) { if (TransportActions.isReadOverrideException(t)) {
shardsResponses.set(shardIndex, t); shardsResponses.set(shardIndex, t);
} }
} }
} }
protected boolean isOverrideException(Throwable t) {
Throwable actual = ExceptionsHelper.unwrapCause(t);
if (actual instanceof IllegalIndexShardStateException) {
return false;
}
if (actual instanceof IndexMissingException) {
return false;
}
if (actual instanceof IndexShardMissingException) {
return false;
}
if (actual instanceof NoShardAvailableActionException) {
return false;
}
return false;
}
class TransportHandler extends BaseTransportRequestHandler<Request> { class TransportHandler extends BaseTransportRequestHandler<Request> {
@Override @Override

View File

@ -24,6 +24,7 @@ import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.*; import org.elasticsearch.action.*;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -39,11 +40,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
@ -162,26 +160,17 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} }
protected boolean retryPrimaryException(Throwable e) { protected boolean retryPrimaryException(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e); return TransportActions.isShardNotAvailableException(e);
return cause instanceof IndexShardMissingException ||
cause instanceof IllegalIndexShardStateException ||
cause instanceof IndexMissingException;
} }
/** /**
* Should an exception be ignored when the operation is performed on the replica. * Should an exception be ignored when the operation is performed on the replica.
*/ */
boolean ignoreReplicaException(Throwable e) { boolean ignoreReplicaException(Throwable e) {
if (TransportActions.isShardNotAvailableException(e)) {
return true;
}
Throwable cause = ExceptionsHelper.unwrapCause(e); Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof IllegalIndexShardStateException) {
return true;
}
if (cause instanceof IndexMissingException) {
return true;
}
if (cause instanceof IndexShardMissingException) {
return true;
}
if (cause instanceof ConnectTransportException) { if (cause instanceof ConnectTransportException) {
return true; return true;
} }

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
@ -41,7 +42,7 @@ import org.elasticsearch.transport.*;
import java.io.IOException; import java.io.IOException;
/** /**
* * A base class for single shard read operations.
*/ */
public abstract class TransportShardSingleOperationAction<Request extends SingleShardOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> { public abstract class TransportShardSingleOperationAction<Request extends SingleShardOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
@ -94,12 +95,10 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
class AsyncSingleAction { class AsyncSingleAction {
private final ActionListener<Response> listener; private final ActionListener<Response> listener;
private final ShardIterator shardIt; private final ShardIterator shardIt;
private final Request request; private final Request request;
private final DiscoveryNodes nodes; private final DiscoveryNodes nodes;
private volatile Throwable lastFailure;
private AsyncSingleAction(Request request, ActionListener<Response> listener) { private AsyncSingleAction(Request request, ActionListener<Response> listener) {
this.request = request; this.request = request;
@ -121,7 +120,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
} }
public void start() { public void start() {
perform(null); perform(new NoShardAvailableActionException(shardIt.shardId()));
} }
private void onFailure(ShardRouting shardRouting, Throwable e) { private void onFailure(ShardRouting shardRouting, Throwable e) {
@ -131,10 +130,15 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
perform(e); perform(e);
} }
private void perform(@Nullable final Throwable lastException) { private void perform(@Nullable final Throwable currentFailure) {
Throwable lastFailure = this.lastFailure;
if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {
lastFailure = currentFailure;
this.lastFailure = currentFailure;
}
final ShardRouting shardRouting = shardIt.nextOrNull(); final ShardRouting shardRouting = shardIt.nextOrNull();
if (shardRouting == null) { if (shardRouting == null) {
Throwable failure = lastException; Throwable failure = lastFailure;
if (failure == null) { if (failure == null) {
failure = new NoShardAvailableActionException(shardIt.shardId(), "No shard available for [" + request + "]"); failure = new NoShardAvailableActionException(shardIt.shardId(), "No shard available for [" + request + "]");
} else { } else {

View File

@ -34,6 +34,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction; import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -51,7 +52,6 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -112,11 +112,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
@Override @Override
protected boolean retryOnFailure(Throwable e) { protected boolean retryOnFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e); return TransportActions.isShardNotAvailableException(e);
if (e instanceof IllegalIndexShardStateException) {
return true;
}
return false;
} }
@Override @Override