Merge pull request #14295 from jasontedor/shard-failed-listener

Add listener mechanism for failures to send shard failed
This commit is contained in:
Jason Tedor 2015-10-28 06:39:26 -04:00
commit df4c7c7aee
5 changed files with 211 additions and 10 deletions

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.NoOpShardStateActionListener;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -686,7 +687,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
/**
* inner class is responsible for send the requests to all replica shards and manage the responses
*/
final class ReplicationPhase extends AbstractRunnable {
final class ReplicationPhase extends AbstractRunnable implements ShardStateAction.Listener {
private final ReplicaRequest replicaRequest;
private final Response finalResponse;
@ -821,6 +822,16 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
forceFinishAsFailed(t);
}
@Override
public void onShardFailedNoMaster() {
}
@Override
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
}
/**
* start sending current requests to replicas
*/
@ -886,7 +897,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
logger.trace("[{}] transport failure during replica request [{}] ", exp, node, replicaRequest);
if (ignoreReplicaException(exp) == false) {
logger.warn("{} failed to perform {} on node {}", exp, shardIt.shardId(), actionName, node);
shardStateAction.shardFailed(shard, indexMetaData.getIndexUUID(), "failed to perform " + actionName + " on replica on node " + node, exp);
shardStateAction.shardFailed(shard, indexMetaData.getIndexUUID(), "failed to perform " + actionName + " on replica on node " + node, exp, ReplicationPhase.this);
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.action.shard;
public class NoOpShardStateActionListener implements ShardStateAction.Listener {
}

View File

@ -77,27 +77,29 @@ public class ShardStateAction extends AbstractComponent {
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler());
}
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure) {
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
if (masterNode == null) {
logger.warn("can't send shard failed for {}, no master known.", shardRouting);
listener.onShardFailedNoMaster();
return;
}
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure);
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, listener);
}
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, @Nullable final Throwable failure) {
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, @Nullable final Throwable failure, Listener listener) {
logger.trace("{} re-sending failed shard for {}, indexUUID [{}], reason [{}]", failure, shardRouting.shardId(), shardRouting, indexUUID, message);
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure);
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, listener);
}
private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, final Throwable failure) {
private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, final Throwable failure, Listener listener) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
listener.onShardFailedFailure(masterNode, exp);
}
});
}
@ -284,4 +286,9 @@ public class ShardStateAction extends AbstractComponent {
return "" + shardRouting + ", indexUUID [" + indexUUID + "], message [" + message + "], failure [" + ExceptionsHelper.detailedMessage(failure) + "]";
}
}
public interface Listener {
default void onShardFailedNoMaster() {}
default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {}
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.NoOpShardStateActionListener;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
@ -76,6 +77,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final NodeIndexDeletedAction nodeIndexDeletedAction;
private final NodeMappingRefreshAction nodeMappingRefreshAction;
private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new NoOpShardStateActionListener();
// a map of mappings type we have seen per index due to cluster state
// we need this so we won't remove types automatically created as part of the indexing process
private final ConcurrentMap<Tuple<String, String>, Boolean> seenMappings = ConcurrentCollections.newConcurrentMap();
@ -473,7 +476,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (failedShards.containsKey(shardRouting.shardId())) {
if (nodes.masterNode() != null) {
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(), nodes.masterNode(),
"master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure.", null);
"master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure.", null, SHARD_STATE_ACTION_LISTENER);
}
} else {
// the master thinks we are started, but we don't have this shard at all, mark it as failed
@ -606,7 +609,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (failedShards.containsKey(shardRouting.shardId())) {
if (nodes.masterNode() != null) {
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(), nodes.masterNode(),
"master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure", null);
"master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure", null, SHARD_STATE_ACTION_LISTENER);
}
return;
}
@ -802,7 +805,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
try {
logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message);
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, indexUUID, message, failure);
shardStateAction.shardFailed(shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER);
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndex(), shardRouting.getId(), message);
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.action.shard;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.*;
public class ShardStateActionTests extends ESTestCase {
private static ThreadPool THREAD_POOL;
private ShardStateAction shardStateAction;
private CapturingTransport transport;
private TransportService transportService;
private TestClusterService clusterService;
@BeforeClass
public static void startThreadPool() {
THREAD_POOL = new ThreadPool("ShardStateActionTest");
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
this.transport = new CapturingTransport();
clusterService = new TestClusterService(THREAD_POOL);
transportService = new TransportService(transport, THREAD_POOL);
transportService.start();
shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null);
}
@Override
@After
public void tearDown() throws Exception {
transportService.stop();
super.tearDown();
}
@AfterClass
public static void stopThreadPool() {
ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS);
THREAD_POOL = null;
}
public void testNoMaster() {
final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5)));
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(clusterService.state().nodes());
builder.masterNodeId(null);
clusterService.setState(ClusterState.builder(clusterService.state()).nodes(builder));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
AtomicBoolean noMaster = new AtomicBoolean();
assert !noMaster.get();
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onShardFailedNoMaster() {
noMaster.set(true);
}
@Override
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
}
});
assertTrue(noMaster.get());
}
public void testFailure() {
final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5)));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
AtomicBoolean failure = new AtomicBoolean();
assert !failure.get();
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onShardFailedNoMaster() {
}
@Override
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
failure.set(true);
}
});
final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests();
transport.clear();
assertThat(capturedRequests.length, equalTo(1));
assert !failure.get();
transport.handleResponse(capturedRequests[0].requestId, new TransportException("simulated"));
assertTrue(failure.get());
}
private ShardRouting getRandomShardRouting(String index) {
IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index);
ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt();
ShardRouting shardRouting = shardsIterator.nextOrNull();
assert shardRouting != null;
return shardRouting;
}
private Throwable getSimulatedFailure() {
return new CorruptIndexException("simulated", (String) null);
}
}