From ac237b269caf2776fa28ed841f768558154bd7a2 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 14 Jan 2016 11:30:03 -0500 Subject: [PATCH 1/2] Add handling of channel failures when starting a shard This commit adds handling of channel failures when starting a shard to o.e.c.a.s.ShardStateAction. This means that shard started requests that timeout or occur when there is no master or the master leaves after the request is sent will now be retried from here. The listener for a shard state request will now only be notified upon successful completion of the shard state request, or when a catastrophic non-channel failure occurs. This commit also refactors the handling of shard failure requests so that the two shard state actions of shard failure and shard started now share the same channel-retry and notification logic. --- .../TransportReplicationAction.java | 2 +- .../action/shard/ShardStateAction.java | 67 ++++++++----------- .../cluster/IndicesClusterStateService.java | 11 +-- .../action/shard/ShardStateActionTests.java | 18 ++--- .../DiscoveryWithServiceDisruptionsIT.java | 2 +- 5 files changed, 45 insertions(+), 55 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 930a43fd840..b2972201808 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -889,7 +889,7 @@ public abstract class TransportReplicationAction { @@ -392,9 +381,9 @@ public class ShardStateAction extends AbstractComponent { * Any other exception is communicated to the requester via * this notification. * - * @param e the unexpected cause of the failure on the master + * @param t the unexpected cause of the failure on the master */ - default void onShardFailedFailure(final Exception e) { + default void onFailure(final Throwable t) { } } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 80ad8c766e6..901fa403ba2 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -563,8 +563,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { try { if (indexShard.recoverFromStore(nodes.localNode())) { - shardStateAction.shardStarted(state, shardRouting, indexMetaData.getIndexUUID(), "after recovery from store"); + shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from store", SHARD_STATE_ACTION_LISTENER); } } catch (Throwable t) { handleRecoveryFailure(indexService, shardRouting, true, t); @@ -663,7 +664,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent exception = new AtomicReference<>(); + AtomicReference throwable = new AtomicReference<>(); LongConsumer retryLoop = requestId -> { if (randomBoolean()) { @@ -243,9 +243,9 @@ public class ShardStateActionTests extends ESTestCase { } @Override - public void onShardFailedFailure(Exception e) { + public void onFailure(Throwable t) { success.set(false); - exception.set(e); + throwable.set(t); latch.countDown(); assert false; } @@ -258,7 +258,7 @@ public class ShardStateActionTests extends ESTestCase { retryLoop.accept(capturedRequests[0].requestId); latch.await(); - assertNull(exception.get()); + assertNull(throwable.get()); assertThat(retries.get(), equalTo(numberOfRetries)); assertTrue(success.get()); } @@ -280,7 +280,7 @@ public class ShardStateActionTests extends ESTestCase { } @Override - public void onShardFailedFailure(Exception e) { + public void onFailure(Throwable t) { failure.set(true); } }); diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 2af9cdc87a4..e9fa8e495d9 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -922,7 +922,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { } @Override - public void onShardFailedFailure(Exception e) { + public void onFailure(Throwable t) { success.set(false); latch.countDown(); assert false; From b7c1847c413daec94847abe9fdf50fa291bfdde4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 19 Jan 2016 06:58:54 -0500 Subject: [PATCH 2/2] Add logging statement on shard state request send This commit restores a debug-level logging statement when sending shard state requests to the master node. --- .../elasticsearch/cluster/action/shard/ShardStateAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index b10f6416a9e..b4316ff1fed 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -50,7 +50,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.NodeDisconnectedException; -import org.elasticsearch.transport.NodeNotConnectedException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -90,6 +89,7 @@ public class ShardStateAction extends AbstractComponent { logger.warn("{} no master known for action [{}] for shard [{}]", shardRoutingEntry.getShardRouting().shardId(), actionName, shardRoutingEntry.getShardRouting()); waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener); } else { + logger.debug("{} sending [{}] for shard [{}]", shardRoutingEntry.getShardRouting().getId(), actionName, shardRoutingEntry); transportService.sendRequest(masterNode, actionName, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override