Improve exception handling for river startup

Due to a change in 0.90 some exceptions may show up when starting a river,
even though this is not a problem as the shard has not been initialized
yet and a retry is scheduled.
This commit is contained in:
Alexander Reelsen 2013-10-07 17:02:24 +02:00
parent 19099ab3da
commit abcf8fc0f7
3 changed files with 9 additions and 4 deletions

View File

@ -41,6 +41,8 @@ import org.elasticsearch.transport.*;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
/** /**
* A base class for single shard read operations. * A base class for single shard read operations.
*/ */
@ -139,7 +141,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
final ShardRouting shardRouting = shardIt.nextOrNull(); final ShardRouting shardRouting = shardIt.nextOrNull();
if (shardRouting == null) { if (shardRouting == null) {
Throwable failure = lastFailure; Throwable failure = lastFailure;
if (failure == null) { if (failure == null || isShardNotAvailableException(failure)) {
failure = new NoShardAvailableActionException(shardIt.shardId()); failure = new NoShardAvailableActionException(shardIt.shardId());
} else { } else {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {

View File

@ -25,13 +25,11 @@ import com.google.common.collect.Maps;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -55,6 +53,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
/** /**
* *
*/ */
@ -278,7 +278,7 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
// this might happen if the state of the river index has not been propagated yet to this node, which // this might happen if the state of the river index has not been propagated yet to this node, which
// should happen pretty fast since we managed to get the _meta in the RiversRouter // should happen pretty fast since we managed to get the _meta in the RiversRouter
Throwable failure = ExceptionsHelper.unwrapCause(e); Throwable failure = ExceptionsHelper.unwrapCause(e);
if ((failure instanceof NoShardAvailableActionException) || (failure instanceof ClusterBlockException) || (failure instanceof IndexMissingException)) { if (isShardNotAvailableException(failure)) {
logger.debug("failed to get _meta from [{}]/[{}], retrying...", e, routing.riverName().type(), routing.riverName().name()); logger.debug("failed to get _meta from [{}]/[{}], retrying...", e, routing.riverName().type(), routing.riverName().name());
final ActionListener<GetResponse> listener = this; final ActionListener<GetResponse> listener = this;
threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.SAME, new Runnable() { threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.SAME, new Runnable() {

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.river.RiverIndexName; import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverName; import org.elasticsearch.river.RiverName;
@ -122,6 +123,8 @@ public class RiversRouter extends AbstractLifecycleComponent<RiversRouter> imple
// ignore, we will get it next time // ignore, we will get it next time
} catch (IndexMissingException e) { } catch (IndexMissingException e) {
// ignore, we will get it next time // ignore, we will get it next time
} catch (IllegalIndexShardStateException e) {
// ignore, we will get it next time
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed to get/parse _meta for [{}]", e, mappingType); logger.warn("failed to get/parse _meta for [{}]", e, mappingType);
} }