From 0a4e041ee5af0839b811c74046042e63184451dc Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 25 Nov 2014 13:05:00 +0100 Subject: [PATCH] [RECOVERY] Throw IndexShardClosedException if shard is closed Today we throw a generic ElasticsearchException when a recovery is cancled. This causes verbose logging and send shard failures and additional unnecessary cluster state events. We can just throw IndexShardClosedException which prevents the send shard failures --- .../index/shard/IndexShardClosedException.java | 4 ++++ .../indices/recovery/ShardRecoveryHandler.java | 18 +++++++++++++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShardClosedException.java b/src/main/java/org/elasticsearch/index/shard/IndexShardClosedException.java index fa68f320729..72fe912dd20 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShardClosedException.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShardClosedException.java @@ -30,4 +30,8 @@ public class IndexShardClosedException extends IllegalIndexShardStateException { public IndexShardClosedException(ShardId shardId, Throwable t) { super(shardId, IndexShardState.CLOSED, "Closed", t); } + + public IndexShardClosedException(ShardId shardId, String message) { + super(shardId, IndexShardState.CLOSED, message); + } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java index 71a17c55501..caa0a00e457 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java @@ -89,7 +89,16 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { private final MappingUpdatedAction mappingUpdatedAction; private final RecoveryResponse response; - private final CancelableThreads cancelableThreads = new CancelableThreads(); + private final CancelableThreads cancelableThreads = new CancelableThreads() { + @Override + protected void fail(String reason) { + if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us + throw new IndexShardClosedException(shard.shardId(), "shard is closed and recovery was canceled reason [" + reason + "]"); + } else { + throw new ElasticsearchException("recovery was canceled reason [" + reason + "]"); + } + } + }; public ShardRecoveryHandler(final InternalIndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings, final TransportService transportService, final TimeValue internalActionTimeout, @@ -625,7 +634,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { cancelableThreads.cancel(reason); } - private static final class CancelableThreads { + private static abstract class CancelableThreads { private final Set threads = new HashSet<>(); private boolean canceled = false; private String reason; @@ -637,9 +646,12 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { public synchronized void failIfCanceled() { if (isCanceled()) { - throw new ElasticsearchException("recovery was canceled reason [" + reason + "]"); + fail(reason); } } + + protected abstract void fail(String reason); + private synchronized void add() { failIfCanceled(); threads.add(Thread.currentThread());