From 296ab3ec16c285bfc37fba40b2ddbde1e56bfc7c Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 18 Jan 2016 05:24:03 -0500 Subject: [PATCH] Fix transport exceptions for shard state actions This commit fixes an issue in the handling of TransportExceptions in ShardStateAction. There were two cases not being handled correctly. - when the local node is shutting down, handlers will be notified with a TransportException with a message starting "transport stopped" - when the remote node disconnects, handlers will be notified with a NodeDisconnectedException In both of these cases, the cause of the exception will be null and this was incorrectly being handled. The first case can passed to the listener like any other critical non-channel failure, and the second case can be handled by modifying the logic for detecting master channel exceptions. There was a third case of NodeNotConnectedException that was not being treated as a master channel exception but should be. This commit adds an integration test that simulates the handling of a shard failure request during a network partition. By isolating the master from the cluster while a shard failed request is in flight, this test simulates that we wait until a new master is elected and then retry sending that shard failed request to the newly elected master. This commit adds methods to CapturingTransport to separate local and remote transport exceptions. The motivation for this change is that local transport exceptions are delivered to listeners (usually, but not always) wrapped in SendRequestTransportException while remote transport exceptions are delivered to listeners wrapped in RemoteTransportException. By making this distinction clear in the CapturingTransport, this makes it less likely that tests will make incorrect assumptions about the exceptions coming out of the transport layer to listeners. Closes #16057 --- .../action/shard/ShardStateAction.java | 25 ++++--- .../TransportBroadcastByNodeActionTests.java | 2 +- .../TransportMasterNodeActionTests.java | 4 +- .../TransportReplicationActionTests.java | 6 +- ...ortInstanceSingleOperationActionTests.java | 10 +-- .../action/shard/ShardStateActionTests.java | 21 ++++-- .../DiscoveryWithServiceDisruptionsIT.java | 70 +++++++++++++++++++ .../test/transport/CapturingTransport.java | 51 ++++++++++++-- 8 files changed, 155 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index a9c69007aee..035c21a20b4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -47,8 +47,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.NodeDisconnectedException; +import org.elasticsearch.transport.NodeNotConnectedException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -58,11 +60,8 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Locale; -import java.util.Set; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; @@ -111,8 +110,7 @@ public class ShardStateAction extends AbstractComponent { @Override public void handleException(TransportException exp) { - assert exp.getCause() != null : exp; - if (isMasterChannelException(exp.getCause())) { + if (isMasterChannelException(exp)) { waitForNewMasterAndRetry(observer, shardRoutingEntry, listener); } else { logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), masterNode, shardRoutingEntry); @@ -123,14 +121,14 @@ public class ShardStateAction extends AbstractComponent { } } - private static Set> MASTER_CHANNEL_EXCEPTIONS = - new HashSet<>(Arrays.asList( - NotMasterException.class, - NodeDisconnectedException.class, - Discovery.FailedToCommitClusterStateException.class - )); - private static boolean isMasterChannelException(Throwable cause) { - return MASTER_CHANNEL_EXCEPTIONS.contains(cause.getClass()); + private static Class[] MASTER_CHANNEL_EXCEPTIONS = new Class[]{ + NotMasterException.class, + ConnectTransportException.class, + Discovery.FailedToCommitClusterStateException.class + }; + + private static boolean isMasterChannelException(TransportException exp) { + return ExceptionsHelper.unwrap(exp, MASTER_CHANNEL_EXCEPTIONS) != null; } // visible for testing @@ -399,4 +397,5 @@ public class ShardStateAction extends AbstractComponent { default void onShardFailedFailure(final Exception e) { } } + } diff --git a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index 90d91dfb197..d94049c036f 100644 --- a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -410,7 +410,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase { // simulate node failure totalShards += map.get(entry.getKey()).size(); totalFailedShards += map.get(entry.getKey()).size(); - transport.handleResponse(requestId, new Exception()); + transport.handleRemoteError(requestId, new Exception()); } else { List shards = map.get(entry.getKey()); List shardResults = new ArrayList<>(); diff --git a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index 980558c2716..c9a7d9bd2d2 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -297,14 +297,14 @@ public class TransportMasterNodeActionTests extends ESTestCase { assertThat(capturedRequest.action, equalTo("testAction")); if (failsWithConnectTransportException) { - transport.handleResponse(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake error")); + transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake error")); assertFalse(listener.isDone()); clusterService.setState(ClusterStateCreationUtils.state(localNode, localNode, allNodes)); assertTrue(listener.isDone()); listener.get(); } else { Throwable t = new Throwable(); - transport.handleResponse(capturedRequest.requestId, t); + transport.handleRemoteError(capturedRequest.requestId, t); assertTrue(listener.isDone()); try { listener.get(); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index e90c213b697..9fdbdf1cb38 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -546,7 +546,7 @@ public class TransportReplicationActionTests extends ESTestCase { t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING); } logger.debug("--> simulating failure on {} with [{}]", capturedRequest.node, t.getClass().getSimpleName()); - transport.handleResponse(capturedRequest.requestId, t); + transport.handleRemoteError(capturedRequest.requestId, t); if (criticalFailure) { CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); assertEquals(1, shardFailedRequests.length); @@ -565,7 +565,7 @@ public class TransportReplicationActionTests extends ESTestCase { for (int retryNumber = 0; retryNumber < numberOfRetries; retryNumber++) { // force a new cluster state to simulate a new master having been elected clusterService.setState(ClusterState.builder(clusterService.state())); - transport.handleResponse(currentRequest.requestId, new NotMasterException("shard-failed-test")); + transport.handleRemoteError(currentRequest.requestId, new NotMasterException("shard-failed-test")); CapturingTransport.CapturedRequest[] retryRequests = transport.getCapturedRequestsAndClear(); assertEquals(1, retryRequests.length); currentRequest = retryRequests[0]; @@ -662,7 +662,7 @@ public class TransportReplicationActionTests extends ESTestCase { CapturingTransport.CapturedRequest[] replicationRequests = transport.getCapturedRequestsAndClear(); assertThat(replicationRequests.length, equalTo(1)); // try with failure response - transport.handleResponse(replicationRequests[0].requestId, new CorruptIndexException("simulated", (String) null)); + transport.handleRemoteError(replicationRequests[0].requestId, new CorruptIndexException("simulated", (String) null)); CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); assertEquals(1, shardFailedRequests.length); transport.handleResponse(shardFailedRequests[0].requestId, TransportResponse.Empty.INSTANCE); diff --git a/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java index fce431238dd..344846c363e 100644 --- a/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java @@ -198,7 +198,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase { long requestId = transport.capturedRequests()[0].requestId; transport.clear(); // this should not trigger retry or anything and the listener should report exception immediately - transport.handleResponse(requestId, new TransportException("a generic transport exception", new Exception("generic test exception"))); + transport.handleRemoteError(requestId, new TransportException("a generic transport exception", new Exception("generic test exception"))); try { // result should return immediately @@ -240,7 +240,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase { long requestId = transport.capturedRequests()[0].requestId; transport.clear(); DiscoveryNode node = clusterService.state().getNodes().getLocalNode(); - transport.handleResponse(requestId, new ConnectTransportException(node, "test exception")); + transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception")); // trigger cluster state observer clusterService.setState(ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED)); assertThat(transport.capturedRequests().length, equalTo(1)); @@ -258,7 +258,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase { long requestId = transport.capturedRequests()[0].requestId; transport.clear(); DiscoveryNode node = clusterService.state().getNodes().getLocalNode(); - transport.handleResponse(requestId, new ConnectTransportException(node, "test exception")); + transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception")); // wait until the timeout was triggered and we actually tried to send for the second time assertBusy(new Runnable() { @@ -270,7 +270,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase { // let it fail the second time too requestId = transport.capturedRequests()[0].requestId; - transport.handleResponse(requestId, new ConnectTransportException(node, "test exception")); + transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception")); try { // result should return immediately assertTrue(listener.isDone()); @@ -313,4 +313,4 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase { } } } -} \ No newline at end of file +} diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 8bc43114e7d..0a00620fcc3 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -37,6 +37,9 @@ import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.NodeDisconnectedException; +import org.elasticsearch.transport.NodeNotConnectedException; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -216,11 +219,17 @@ public class ShardStateActionTests extends ESTestCase { AtomicReference exception = new AtomicReference<>(); LongConsumer retryLoop = requestId -> { - List possibleExceptions = new ArrayList<>(); - possibleExceptions.add(new NotMasterException("simulated")); - possibleExceptions.add(new NodeDisconnectedException(clusterService.state().nodes().masterNode(), ShardStateAction.SHARD_FAILED_ACTION_NAME)); - possibleExceptions.add(new Discovery.FailedToCommitClusterStateException("simulated")); - transport.handleResponse(requestId, randomFrom(possibleExceptions)); + if (randomBoolean()) { + transport.handleRemoteError( + requestId, + randomFrom(new NotMasterException("simulated"), new Discovery.FailedToCommitClusterStateException("simulated"))); + } else { + if (randomBoolean()) { + transport.handleLocalError(requestId, new NodeNotConnectedException(null, "simulated")); + } else { + transport.handleError(requestId, new NodeDisconnectedException(null, ShardStateAction.SHARD_FAILED_ACTION_NAME)); + } + } }; final int numberOfRetries = randomIntBetween(1, 256); @@ -279,7 +288,7 @@ public class ShardStateActionTests extends ESTestCase { final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests.length, equalTo(1)); assertFalse(failure.get()); - transport.handleResponse(capturedRequests[0].requestId, new TransportException("simulated")); + transport.handleRemoteError(capturedRequests[0].requestId, new TransportException("simulated")); assertTrue(failure.get()); } diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 67d3df42b38..2af9cdc87a4 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.get.GetResponse; @@ -30,12 +31,15 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.Murmur3HashFunction; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; @@ -96,6 +100,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -883,6 +888,71 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { internalCluster().stopRandomNonMasterNode(); } + // simulate handling of sending shard failure during an isolation + public void testSendingShardFailure() throws Exception { + List nodes = startCluster(3, 2); + String masterNode = internalCluster().getMasterName(); + List nonMasterNodes = nodes.stream().filter(node -> !node.equals(masterNode)).collect(Collectors.toList()); + String nonMasterNode = randomFrom(nonMasterNodes); + assertAcked(prepareCreate("test") + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + )); + ensureGreen(); + String nonMasterNodeId = internalCluster().clusterService(nonMasterNode).localNode().getId(); + + // fail a random shard + ShardRouting failedShard = + randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED)); + ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode); + String indexUUID = clusterService().state().metaData().index("test").getIndexUUID(); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean success = new AtomicBoolean(); + + String isolatedNode = randomBoolean() ? masterNode : nonMasterNode; + NetworkPartition networkPartition = addRandomIsolation(isolatedNode); + networkPartition.startDisrupting(); + + service.shardFailed(failedShard, indexUUID, "simulated", new CorruptIndexException("simulated", (String) null), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + success.set(true); + latch.countDown(); + } + + @Override + public void onShardFailedFailure(Exception e) { + success.set(false); + latch.countDown(); + assert false; + } + }); + + if (isolatedNode.equals(nonMasterNode)) { + assertNoMaster(nonMasterNode); + } else { + ensureStableCluster(2, nonMasterNode); + } + + // heal the partition + networkPartition.removeAndEnsureHealthy(internalCluster()); + + // the cluster should stabilize + ensureStableCluster(3); + + latch.await(); + + // the listener should be notified + assertTrue(success.get()); + + // the failed shard should be gone + List shards = clusterService().state().getRoutingTable().allShards("test"); + for (ShardRouting shard : shards) { + assertThat(shard.allocationId(), not(equalTo(failedShard.allocationId()))); + } + } + public void testClusterFormingWithASlowNode() throws Exception { configureUnicastCluster(3, null, 2); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index eefdb996e65..48d79ef064b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -16,9 +16,11 @@ * specific language governing permissions and limitations * under the License. */ + package org.elasticsearch.test.transport; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.transport.BoundTransportAddress; @@ -26,6 +28,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -40,9 +43,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** A transport class that doesn't send anything but rather captures all requests for inspection from tests */ public class CapturingTransport implements Transport { + private TransportServiceAdapter adapter; static public class CapturedRequest { @@ -59,6 +65,7 @@ public class CapturingTransport implements Transport { } } + private ConcurrentMap> requests = new ConcurrentHashMap<>(); private BlockingQueue capturedRequests = ConcurrentCollections.newBlockingQueue(); /** returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()} */ @@ -120,14 +127,50 @@ public class CapturingTransport implements Transport { adapter.onResponseReceived(requestId).handleResponse(response); } - /** simulate a remote error for the given requesTId */ - public void handleResponse(final long requestId, final Throwable t) { - adapter.onResponseReceived(requestId).handleException(new RemoteTransportException("remote failure", t)); + /** + * simulate a local error for the given requestId, will be wrapped + * by a {@link SendRequestTransportException} + * + * @param requestId the id corresponding to the captured send + * request + * @param t the failure to wrap + */ + public void handleLocalError(final long requestId, final Throwable t) { + Tuple request = requests.get(requestId); + assert request != null; + this.handleError(requestId, new SendRequestTransportException(request.v1(), request.v2(), t)); } + /** + * simulate a remote error for the given requestId, will be wrapped + * by a {@link RemoteTransportException} + * + * @param requestId the id corresponding to the captured send + * request + * @param t the failure to wrap + */ + public void handleRemoteError(final long requestId, final Throwable t) { + this.handleError(requestId, new RemoteTransportException("remote failure", t)); + } + + /** + * simulate an error for the given requestId, unlike + * {@link #handleLocalError(long, Throwable)} and + * {@link #handleRemoteError(long, Throwable)}, the provided + * exception will not be wrapped but will be delivered to the + * transport layer as is + * + * @param requestId the id corresponding to the captured send + * request + * @param e the failure + */ + public void handleError(final long requestId, final TransportException e) { + adapter.onResponseReceived(requestId).handleException(e); + } @Override public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + requests.put(requestId, Tuple.tuple(node, action)); capturedRequests.add(new CapturedRequest(node, requestId, action, request)); } @@ -149,7 +192,6 @@ public class CapturingTransport implements Transport { @Override public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { - // WTF return new TransportAddress[0]; } @@ -217,4 +259,5 @@ public class CapturingTransport implements Transport { public List getLocalAddresses() { return Collections.emptyList(); } + }