Merge branch 'master' into new_index_settings

This commit is contained in:
Simon Willnauer 2016-01-19 10:50:11 +01:00
commit fa0e1488aa
8 changed files with 155 additions and 34 deletions

View File

@ -47,8 +47,10 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@ -58,11 +60,8 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
@ -111,8 +110,7 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void handleException(TransportException exp) {
assert exp.getCause() != null : exp;
if (isMasterChannelException(exp.getCause())) {
if (isMasterChannelException(exp)) {
waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
} else {
logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), masterNode, shardRoutingEntry);
@ -123,14 +121,14 @@ public class ShardStateAction extends AbstractComponent {
}
}
private static Set<Class<?>> MASTER_CHANNEL_EXCEPTIONS =
new HashSet<>(Arrays.asList(
private static Class[] MASTER_CHANNEL_EXCEPTIONS = new Class[]{
NotMasterException.class,
NodeDisconnectedException.class,
ConnectTransportException.class,
Discovery.FailedToCommitClusterStateException.class
));
private static boolean isMasterChannelException(Throwable cause) {
return MASTER_CHANNEL_EXCEPTIONS.contains(cause.getClass());
};
private static boolean isMasterChannelException(TransportException exp) {
return ExceptionsHelper.unwrap(exp, MASTER_CHANNEL_EXCEPTIONS) != null;
}
// visible for testing
@ -399,4 +397,5 @@ public class ShardStateAction extends AbstractComponent {
default void onShardFailedFailure(final Exception e) {
}
}
}

View File

@ -410,7 +410,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
// simulate node failure
totalShards += map.get(entry.getKey()).size();
totalFailedShards += map.get(entry.getKey()).size();
transport.handleResponse(requestId, new Exception());
transport.handleRemoteError(requestId, new Exception());
} else {
List<ShardRouting> shards = map.get(entry.getKey());
List<TransportBroadcastByNodeAction.EmptyResult> shardResults = new ArrayList<>();

View File

@ -297,14 +297,14 @@ public class TransportMasterNodeActionTests extends ESTestCase {
assertThat(capturedRequest.action, equalTo("testAction"));
if (failsWithConnectTransportException) {
transport.handleResponse(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake error"));
transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake error"));
assertFalse(listener.isDone());
clusterService.setState(ClusterStateCreationUtils.state(localNode, localNode, allNodes));
assertTrue(listener.isDone());
listener.get();
} else {
Throwable t = new Throwable();
transport.handleResponse(capturedRequest.requestId, t);
transport.handleRemoteError(capturedRequest.requestId, t);
assertTrue(listener.isDone());
try {
listener.get();

View File

@ -546,7 +546,7 @@ public class TransportReplicationActionTests extends ESTestCase {
t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING);
}
logger.debug("--> simulating failure on {} with [{}]", capturedRequest.node, t.getClass().getSimpleName());
transport.handleResponse(capturedRequest.requestId, t);
transport.handleRemoteError(capturedRequest.requestId, t);
if (criticalFailure) {
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
assertEquals(1, shardFailedRequests.length);
@ -565,7 +565,7 @@ public class TransportReplicationActionTests extends ESTestCase {
for (int retryNumber = 0; retryNumber < numberOfRetries; retryNumber++) {
// force a new cluster state to simulate a new master having been elected
clusterService.setState(ClusterState.builder(clusterService.state()));
transport.handleResponse(currentRequest.requestId, new NotMasterException("shard-failed-test"));
transport.handleRemoteError(currentRequest.requestId, new NotMasterException("shard-failed-test"));
CapturingTransport.CapturedRequest[] retryRequests = transport.getCapturedRequestsAndClear();
assertEquals(1, retryRequests.length);
currentRequest = retryRequests[0];
@ -662,7 +662,7 @@ public class TransportReplicationActionTests extends ESTestCase {
CapturingTransport.CapturedRequest[] replicationRequests = transport.getCapturedRequestsAndClear();
assertThat(replicationRequests.length, equalTo(1));
// try with failure response
transport.handleResponse(replicationRequests[0].requestId, new CorruptIndexException("simulated", (String) null));
transport.handleRemoteError(replicationRequests[0].requestId, new CorruptIndexException("simulated", (String) null));
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
assertEquals(1, shardFailedRequests.length);
transport.handleResponse(shardFailedRequests[0].requestId, TransportResponse.Empty.INSTANCE);

View File

@ -198,7 +198,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
long requestId = transport.capturedRequests()[0].requestId;
transport.clear();
// this should not trigger retry or anything and the listener should report exception immediately
transport.handleResponse(requestId, new TransportException("a generic transport exception", new Exception("generic test exception")));
transport.handleRemoteError(requestId, new TransportException("a generic transport exception", new Exception("generic test exception")));
try {
// result should return immediately
@ -240,7 +240,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
long requestId = transport.capturedRequests()[0].requestId;
transport.clear();
DiscoveryNode node = clusterService.state().getNodes().getLocalNode();
transport.handleResponse(requestId, new ConnectTransportException(node, "test exception"));
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
// trigger cluster state observer
clusterService.setState(ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED));
assertThat(transport.capturedRequests().length, equalTo(1));
@ -258,7 +258,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
long requestId = transport.capturedRequests()[0].requestId;
transport.clear();
DiscoveryNode node = clusterService.state().getNodes().getLocalNode();
transport.handleResponse(requestId, new ConnectTransportException(node, "test exception"));
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
// wait until the timeout was triggered and we actually tried to send for the second time
assertBusy(new Runnable() {
@ -270,7 +270,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
// let it fail the second time too
requestId = transport.capturedRequests()[0].requestId;
transport.handleResponse(requestId, new ConnectTransportException(node, "test exception"));
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
try {
// result should return immediately
assertTrue(listener.isDone());

View File

@ -37,6 +37,9 @@ import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
@ -216,11 +219,17 @@ public class ShardStateActionTests extends ESTestCase {
AtomicReference<Exception> exception = new AtomicReference<>();
LongConsumer retryLoop = requestId -> {
List<Exception> possibleExceptions = new ArrayList<>();
possibleExceptions.add(new NotMasterException("simulated"));
possibleExceptions.add(new NodeDisconnectedException(clusterService.state().nodes().masterNode(), ShardStateAction.SHARD_FAILED_ACTION_NAME));
possibleExceptions.add(new Discovery.FailedToCommitClusterStateException("simulated"));
transport.handleResponse(requestId, randomFrom(possibleExceptions));
if (randomBoolean()) {
transport.handleRemoteError(
requestId,
randomFrom(new NotMasterException("simulated"), new Discovery.FailedToCommitClusterStateException("simulated")));
} else {
if (randomBoolean()) {
transport.handleLocalError(requestId, new NodeNotConnectedException(null, "simulated"));
} else {
transport.handleError(requestId, new NodeDisconnectedException(null, ShardStateAction.SHARD_FAILED_ACTION_NAME));
}
}
};
final int numberOfRetries = randomIntBetween(1, 256);
@ -279,7 +288,7 @@ public class ShardStateActionTests extends ESTestCase {
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(1));
assertFalse(failure.get());
transport.handleResponse(capturedRequests[0].requestId, new TransportException("simulated"));
transport.handleRemoteError(capturedRequests[0].requestId, new TransportException("simulated"));
assertTrue(failure.get());
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.discovery;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetResponse;
@ -30,12 +31,15 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
@ -96,6 +100,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -883,6 +888,71 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
internalCluster().stopRandomNonMasterNode();
}
// simulate handling of sending shard failure during an isolation
public void testSendingShardFailure() throws Exception {
List<String> nodes = startCluster(3, 2);
String masterNode = internalCluster().getMasterName();
List<String> nonMasterNodes = nodes.stream().filter(node -> !node.equals(masterNode)).collect(Collectors.toList());
String nonMasterNode = randomFrom(nonMasterNodes);
assertAcked(prepareCreate("test")
.setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
));
ensureGreen();
String nonMasterNodeId = internalCluster().clusterService(nonMasterNode).localNode().getId();
// fail a random shard
ShardRouting failedShard =
randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED));
ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode);
String indexUUID = clusterService().state().metaData().index("test").getIndexUUID();
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean success = new AtomicBoolean();
String isolatedNode = randomBoolean() ? masterNode : nonMasterNode;
NetworkPartition networkPartition = addRandomIsolation(isolatedNode);
networkPartition.startDisrupting();
service.shardFailed(failedShard, indexUUID, "simulated", new CorruptIndexException("simulated", (String) null), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);
latch.countDown();
}
@Override
public void onShardFailedFailure(Exception e) {
success.set(false);
latch.countDown();
assert false;
}
});
if (isolatedNode.equals(nonMasterNode)) {
assertNoMaster(nonMasterNode);
} else {
ensureStableCluster(2, nonMasterNode);
}
// heal the partition
networkPartition.removeAndEnsureHealthy(internalCluster());
// the cluster should stabilize
ensureStableCluster(3);
latch.await();
// the listener should be notified
assertTrue(success.get());
// the failed shard should be gone
List<ShardRouting> shards = clusterService().state().getRoutingTable().allShards("test");
for (ShardRouting shard : shards) {
assertThat(shard.allocationId(), not(equalTo(failedShard.allocationId())));
}
}
public void testClusterFormingWithASlowNode() throws Exception {
configureUnicastCluster(3, null, 2);

View File

@ -16,9 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.transport.BoundTransportAddress;
@ -26,6 +28,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@ -40,9 +43,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/** A transport class that doesn't send anything but rather captures all requests for inspection from tests */
public class CapturingTransport implements Transport {
private TransportServiceAdapter adapter;
static public class CapturedRequest {
@ -59,6 +65,7 @@ public class CapturingTransport implements Transport {
}
}
private ConcurrentMap<Long, Tuple<DiscoveryNode, String>> requests = new ConcurrentHashMap<>();
private BlockingQueue<CapturedRequest> capturedRequests = ConcurrentCollections.newBlockingQueue();
/** returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()} */
@ -120,14 +127,50 @@ public class CapturingTransport implements Transport {
adapter.onResponseReceived(requestId).handleResponse(response);
}
/** simulate a remote error for the given requesTId */
public void handleResponse(final long requestId, final Throwable t) {
adapter.onResponseReceived(requestId).handleException(new RemoteTransportException("remote failure", t));
/**
* simulate a local error for the given requestId, will be wrapped
* by a {@link SendRequestTransportException}
*
* @param requestId the id corresponding to the captured send
* request
* @param t the failure to wrap
*/
public void handleLocalError(final long requestId, final Throwable t) {
Tuple<DiscoveryNode, String> request = requests.get(requestId);
assert request != null;
this.handleError(requestId, new SendRequestTransportException(request.v1(), request.v2(), t));
}
/**
* simulate a remote error for the given requestId, will be wrapped
* by a {@link RemoteTransportException}
*
* @param requestId the id corresponding to the captured send
* request
* @param t the failure to wrap
*/
public void handleRemoteError(final long requestId, final Throwable t) {
this.handleError(requestId, new RemoteTransportException("remote failure", t));
}
/**
* simulate an error for the given requestId, unlike
* {@link #handleLocalError(long, Throwable)} and
* {@link #handleRemoteError(long, Throwable)}, the provided
* exception will not be wrapped but will be delivered to the
* transport layer as is
*
* @param requestId the id corresponding to the captured send
* request
* @param e the failure
*/
public void handleError(final long requestId, final TransportException e) {
adapter.onResponseReceived(requestId).handleException(e);
}
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
requests.put(requestId, Tuple.tuple(node, action));
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
}
@ -149,7 +192,6 @@ public class CapturingTransport implements Transport {
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
// WTF
return new TransportAddress[0];
}
@ -217,4 +259,5 @@ public class CapturingTransport implements Transport {
public List<String> getLocalAddresses() {
return Collections.emptyList();
}
}