Fix transport exceptions for shard state actions

This commit fixes an issue in the handling of TransportExceptions in
ShardStateAction. There were two cases not being handled correctly.
 - when the local node is shutting down, handlers will be notified with
   a TransportException with a message starting "transport stopped"
 - when the remote node disconnects, handlers will be notified with a
   NodeDisconnectedException

In both of these cases, the cause of the exception will be null and this
was incorrectly being handled. The first case can passed to the listener
like any other critical non-channel failure, and the second case can be
handled by modifying the logic for detecting master channel exceptions.

There was a third case of NodeNotConnectedException that was not being
treated as a master channel exception but should be.

This commit adds an integration test that simulates the handling of a
shard failure request during a network partition. By isolating the
master from the cluster while a shard failed request is in flight, this
test simulates that we wait until a new master is elected and then retry
sending that shard failed request to the newly elected master.

This commit adds methods to CapturingTransport to separate local and
remote transport exceptions. The motivation for this change is that
local transport exceptions are delivered to listeners (usually, but not
always) wrapped in SendRequestTransportException while remote transport
exceptions are delivered to listeners wrapped in
RemoteTransportException. By making this distinction clear in the
CapturingTransport, this makes it less likely that tests will make
incorrect assumptions about the exceptions coming out of the transport
layer to listeners.

Closes #16057
This commit is contained in:
Jason Tedor 2016-01-18 05:24:03 -05:00 committed by Boaz Leskes
parent fc4e8fd937
commit 296ab3ec16
8 changed files with 155 additions and 34 deletions

View File

@ -47,8 +47,10 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@ -58,11 +60,8 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
@ -111,8 +110,7 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void handleException(TransportException exp) {
assert exp.getCause() != null : exp;
if (isMasterChannelException(exp.getCause())) {
if (isMasterChannelException(exp)) {
waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
} else {
logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), masterNode, shardRoutingEntry);
@ -123,14 +121,14 @@ public class ShardStateAction extends AbstractComponent {
}
}
private static Set<Class<?>> MASTER_CHANNEL_EXCEPTIONS =
new HashSet<>(Arrays.asList(
NotMasterException.class,
NodeDisconnectedException.class,
Discovery.FailedToCommitClusterStateException.class
));
private static boolean isMasterChannelException(Throwable cause) {
return MASTER_CHANNEL_EXCEPTIONS.contains(cause.getClass());
private static Class[] MASTER_CHANNEL_EXCEPTIONS = new Class[]{
NotMasterException.class,
ConnectTransportException.class,
Discovery.FailedToCommitClusterStateException.class
};
private static boolean isMasterChannelException(TransportException exp) {
return ExceptionsHelper.unwrap(exp, MASTER_CHANNEL_EXCEPTIONS) != null;
}
// visible for testing
@ -399,4 +397,5 @@ public class ShardStateAction extends AbstractComponent {
default void onShardFailedFailure(final Exception e) {
}
}
}

View File

@ -410,7 +410,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
// simulate node failure
totalShards += map.get(entry.getKey()).size();
totalFailedShards += map.get(entry.getKey()).size();
transport.handleResponse(requestId, new Exception());
transport.handleRemoteError(requestId, new Exception());
} else {
List<ShardRouting> shards = map.get(entry.getKey());
List<TransportBroadcastByNodeAction.EmptyResult> shardResults = new ArrayList<>();

View File

@ -297,14 +297,14 @@ public class TransportMasterNodeActionTests extends ESTestCase {
assertThat(capturedRequest.action, equalTo("testAction"));
if (failsWithConnectTransportException) {
transport.handleResponse(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake error"));
transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake error"));
assertFalse(listener.isDone());
clusterService.setState(ClusterStateCreationUtils.state(localNode, localNode, allNodes));
assertTrue(listener.isDone());
listener.get();
} else {
Throwable t = new Throwable();
transport.handleResponse(capturedRequest.requestId, t);
transport.handleRemoteError(capturedRequest.requestId, t);
assertTrue(listener.isDone());
try {
listener.get();

View File

@ -546,7 +546,7 @@ public class TransportReplicationActionTests extends ESTestCase {
t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING);
}
logger.debug("--> simulating failure on {} with [{}]", capturedRequest.node, t.getClass().getSimpleName());
transport.handleResponse(capturedRequest.requestId, t);
transport.handleRemoteError(capturedRequest.requestId, t);
if (criticalFailure) {
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
assertEquals(1, shardFailedRequests.length);
@ -565,7 +565,7 @@ public class TransportReplicationActionTests extends ESTestCase {
for (int retryNumber = 0; retryNumber < numberOfRetries; retryNumber++) {
// force a new cluster state to simulate a new master having been elected
clusterService.setState(ClusterState.builder(clusterService.state()));
transport.handleResponse(currentRequest.requestId, new NotMasterException("shard-failed-test"));
transport.handleRemoteError(currentRequest.requestId, new NotMasterException("shard-failed-test"));
CapturingTransport.CapturedRequest[] retryRequests = transport.getCapturedRequestsAndClear();
assertEquals(1, retryRequests.length);
currentRequest = retryRequests[0];
@ -662,7 +662,7 @@ public class TransportReplicationActionTests extends ESTestCase {
CapturingTransport.CapturedRequest[] replicationRequests = transport.getCapturedRequestsAndClear();
assertThat(replicationRequests.length, equalTo(1));
// try with failure response
transport.handleResponse(replicationRequests[0].requestId, new CorruptIndexException("simulated", (String) null));
transport.handleRemoteError(replicationRequests[0].requestId, new CorruptIndexException("simulated", (String) null));
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
assertEquals(1, shardFailedRequests.length);
transport.handleResponse(shardFailedRequests[0].requestId, TransportResponse.Empty.INSTANCE);

View File

@ -198,7 +198,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
long requestId = transport.capturedRequests()[0].requestId;
transport.clear();
// this should not trigger retry or anything and the listener should report exception immediately
transport.handleResponse(requestId, new TransportException("a generic transport exception", new Exception("generic test exception")));
transport.handleRemoteError(requestId, new TransportException("a generic transport exception", new Exception("generic test exception")));
try {
// result should return immediately
@ -240,7 +240,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
long requestId = transport.capturedRequests()[0].requestId;
transport.clear();
DiscoveryNode node = clusterService.state().getNodes().getLocalNode();
transport.handleResponse(requestId, new ConnectTransportException(node, "test exception"));
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
// trigger cluster state observer
clusterService.setState(ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED));
assertThat(transport.capturedRequests().length, equalTo(1));
@ -258,7 +258,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
long requestId = transport.capturedRequests()[0].requestId;
transport.clear();
DiscoveryNode node = clusterService.state().getNodes().getLocalNode();
transport.handleResponse(requestId, new ConnectTransportException(node, "test exception"));
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
// wait until the timeout was triggered and we actually tried to send for the second time
assertBusy(new Runnable() {
@ -270,7 +270,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
// let it fail the second time too
requestId = transport.capturedRequests()[0].requestId;
transport.handleResponse(requestId, new ConnectTransportException(node, "test exception"));
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
try {
// result should return immediately
assertTrue(listener.isDone());
@ -313,4 +313,4 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
}
}
}
}
}

View File

@ -37,6 +37,9 @@ import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
@ -216,11 +219,17 @@ public class ShardStateActionTests extends ESTestCase {
AtomicReference<Exception> exception = new AtomicReference<>();
LongConsumer retryLoop = requestId -> {
List<Exception> possibleExceptions = new ArrayList<>();
possibleExceptions.add(new NotMasterException("simulated"));
possibleExceptions.add(new NodeDisconnectedException(clusterService.state().nodes().masterNode(), ShardStateAction.SHARD_FAILED_ACTION_NAME));
possibleExceptions.add(new Discovery.FailedToCommitClusterStateException("simulated"));
transport.handleResponse(requestId, randomFrom(possibleExceptions));
if (randomBoolean()) {
transport.handleRemoteError(
requestId,
randomFrom(new NotMasterException("simulated"), new Discovery.FailedToCommitClusterStateException("simulated")));
} else {
if (randomBoolean()) {
transport.handleLocalError(requestId, new NodeNotConnectedException(null, "simulated"));
} else {
transport.handleError(requestId, new NodeDisconnectedException(null, ShardStateAction.SHARD_FAILED_ACTION_NAME));
}
}
};
final int numberOfRetries = randomIntBetween(1, 256);
@ -279,7 +288,7 @@ public class ShardStateActionTests extends ESTestCase {
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(1));
assertFalse(failure.get());
transport.handleResponse(capturedRequests[0].requestId, new TransportException("simulated"));
transport.handleRemoteError(capturedRequests[0].requestId, new TransportException("simulated"));
assertTrue(failure.get());
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.discovery;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetResponse;
@ -30,12 +31,15 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
@ -96,6 +100,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -883,6 +888,71 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
internalCluster().stopRandomNonMasterNode();
}
// simulate handling of sending shard failure during an isolation
public void testSendingShardFailure() throws Exception {
List<String> nodes = startCluster(3, 2);
String masterNode = internalCluster().getMasterName();
List<String> nonMasterNodes = nodes.stream().filter(node -> !node.equals(masterNode)).collect(Collectors.toList());
String nonMasterNode = randomFrom(nonMasterNodes);
assertAcked(prepareCreate("test")
.setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
));
ensureGreen();
String nonMasterNodeId = internalCluster().clusterService(nonMasterNode).localNode().getId();
// fail a random shard
ShardRouting failedShard =
randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED));
ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode);
String indexUUID = clusterService().state().metaData().index("test").getIndexUUID();
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean success = new AtomicBoolean();
String isolatedNode = randomBoolean() ? masterNode : nonMasterNode;
NetworkPartition networkPartition = addRandomIsolation(isolatedNode);
networkPartition.startDisrupting();
service.shardFailed(failedShard, indexUUID, "simulated", new CorruptIndexException("simulated", (String) null), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);
latch.countDown();
}
@Override
public void onShardFailedFailure(Exception e) {
success.set(false);
latch.countDown();
assert false;
}
});
if (isolatedNode.equals(nonMasterNode)) {
assertNoMaster(nonMasterNode);
} else {
ensureStableCluster(2, nonMasterNode);
}
// heal the partition
networkPartition.removeAndEnsureHealthy(internalCluster());
// the cluster should stabilize
ensureStableCluster(3);
latch.await();
// the listener should be notified
assertTrue(success.get());
// the failed shard should be gone
List<ShardRouting> shards = clusterService().state().getRoutingTable().allShards("test");
for (ShardRouting shard : shards) {
assertThat(shard.allocationId(), not(equalTo(failedShard.allocationId())));
}
}
public void testClusterFormingWithASlowNode() throws Exception {
configureUnicastCluster(3, null, 2);

View File

@ -16,9 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.transport.BoundTransportAddress;
@ -26,6 +28,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@ -40,9 +43,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/** A transport class that doesn't send anything but rather captures all requests for inspection from tests */
public class CapturingTransport implements Transport {
private TransportServiceAdapter adapter;
static public class CapturedRequest {
@ -59,6 +65,7 @@ public class CapturingTransport implements Transport {
}
}
private ConcurrentMap<Long, Tuple<DiscoveryNode, String>> requests = new ConcurrentHashMap<>();
private BlockingQueue<CapturedRequest> capturedRequests = ConcurrentCollections.newBlockingQueue();
/** returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()} */
@ -120,14 +127,50 @@ public class CapturingTransport implements Transport {
adapter.onResponseReceived(requestId).handleResponse(response);
}
/** simulate a remote error for the given requesTId */
public void handleResponse(final long requestId, final Throwable t) {
adapter.onResponseReceived(requestId).handleException(new RemoteTransportException("remote failure", t));
/**
* simulate a local error for the given requestId, will be wrapped
* by a {@link SendRequestTransportException}
*
* @param requestId the id corresponding to the captured send
* request
* @param t the failure to wrap
*/
public void handleLocalError(final long requestId, final Throwable t) {
Tuple<DiscoveryNode, String> request = requests.get(requestId);
assert request != null;
this.handleError(requestId, new SendRequestTransportException(request.v1(), request.v2(), t));
}
/**
* simulate a remote error for the given requestId, will be wrapped
* by a {@link RemoteTransportException}
*
* @param requestId the id corresponding to the captured send
* request
* @param t the failure to wrap
*/
public void handleRemoteError(final long requestId, final Throwable t) {
this.handleError(requestId, new RemoteTransportException("remote failure", t));
}
/**
* simulate an error for the given requestId, unlike
* {@link #handleLocalError(long, Throwable)} and
* {@link #handleRemoteError(long, Throwable)}, the provided
* exception will not be wrapped but will be delivered to the
* transport layer as is
*
* @param requestId the id corresponding to the captured send
* request
* @param e the failure
*/
public void handleError(final long requestId, final TransportException e) {
adapter.onResponseReceived(requestId).handleException(e);
}
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
requests.put(requestId, Tuple.tuple(node, action));
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
}
@ -149,7 +192,6 @@ public class CapturingTransport implements Transport {
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
// WTF
return new TransportAddress[0];
}
@ -217,4 +259,5 @@ public class CapturingTransport implements Transport {
public List<String> getLocalAddresses() {
return Collections.emptyList();
}
}