Add listener mechanism for failures to send shard failed

This commit adds a listener mechanism for executing callbacks when
exceptional situations occur sending a shard failure message to the
master. The two types of exceptional situations that can occur are if
the master is not known and if the transport request exception handler
is invoked for any reason after sending the shard failed request to the
master. This commit only adds the infrastructure for executing
callbacks when one of these exceptional situations occur; no effort is
made to properly handle the exceptional situations. Some unit tests are
added for ShardStateAction to test that the listener infrastructure is
correct.

Relates #14252
This commit is contained in:
Jason Tedor 2015-10-26 20:28:25 -04:00
parent 68c6c6400d
commit 172ad38408
5 changed files with 211 additions and 10 deletions

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.NoOpShardStateActionListener;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -686,7 +687,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
/**
* inner class is responsible for send the requests to all replica shards and manage the responses
*/
final class ReplicationPhase extends AbstractRunnable {
final class ReplicationPhase extends AbstractRunnable implements ShardStateAction.Listener {
private final ReplicaRequest replicaRequest;
private final Response finalResponse;
@ -821,6 +822,16 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
forceFinishAsFailed(t);
}
@Override
public void onShardFailedNoMaster() {
}
@Override
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
}
/**
* start sending current requests to replicas
*/
@ -886,7 +897,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
logger.trace("[{}] transport failure during replica request [{}] ", exp, node, replicaRequest);
if (ignoreReplicaException(exp) == false) {
logger.warn("{} failed to perform {} on node {}", exp, shardIt.shardId(), actionName, node);
shardStateAction.shardFailed(shard, indexMetaData.getIndexUUID(), "failed to perform " + actionName + " on replica on node " + node, exp);
shardStateAction.shardFailed(shard, indexMetaData.getIndexUUID(), "failed to perform " + actionName + " on replica on node " + node, exp, ReplicationPhase.this);
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.action.shard;
public class NoOpShardStateActionListener implements ShardStateAction.Listener {
}

View File

@ -77,27 +77,29 @@ public class ShardStateAction extends AbstractComponent {
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler());
}
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure) {
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
if (masterNode == null) {
logger.warn("can't send shard failed for {}, no master known.", shardRouting);
listener.onShardFailedNoMaster();
return;
}
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure);
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, listener);
}
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, @Nullable final Throwable failure) {
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, @Nullable final Throwable failure, Listener listener) {
logger.trace("{} re-sending failed shard for {}, indexUUID [{}], reason [{}]", failure, shardRouting.shardId(), shardRouting, indexUUID, message);
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure);
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, listener);
}
private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, final Throwable failure) {
private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, final Throwable failure, Listener listener) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
listener.onShardFailedFailure(masterNode, exp);
}
});
}
@ -284,4 +286,9 @@ public class ShardStateAction extends AbstractComponent {
return "" + shardRouting + ", indexUUID [" + indexUUID + "], message [" + message + "], failure [" + ExceptionsHelper.detailedMessage(failure) + "]";
}
}
public interface Listener {
default void onShardFailedNoMaster() {}
default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {}
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.NoOpShardStateActionListener;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
@ -76,6 +77,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final NodeIndexDeletedAction nodeIndexDeletedAction;
private final NodeMappingRefreshAction nodeMappingRefreshAction;
private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new NoOpShardStateActionListener();
// a map of mappings type we have seen per index due to cluster state
// we need this so we won't remove types automatically created as part of the indexing process
private final ConcurrentMap<Tuple<String, String>, Boolean> seenMappings = ConcurrentCollections.newConcurrentMap();
@ -473,7 +476,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (failedShards.containsKey(shardRouting.shardId())) {
if (nodes.masterNode() != null) {
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(), nodes.masterNode(),
"master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure.", null);
"master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure.", null, SHARD_STATE_ACTION_LISTENER);
}
} else {
// the master thinks we are started, but we don't have this shard at all, mark it as failed
@ -606,7 +609,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (failedShards.containsKey(shardRouting.shardId())) {
if (nodes.masterNode() != null) {
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(), nodes.masterNode(),
"master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure", null);
"master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure", null, SHARD_STATE_ACTION_LISTENER);
}
return;
}
@ -802,7 +805,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
try {
logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message);
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, indexUUID, message, failure);
shardStateAction.shardFailed(shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER);
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndex(), shardRouting.getId(), message);
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.action.shard;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.*;
public class ShardStateActionTests extends ESTestCase {
private static ThreadPool THREAD_POOL;
private ShardStateAction shardStateAction;
private CapturingTransport transport;
private TransportService transportService;
private TestClusterService clusterService;
@BeforeClass
public static void startThreadPool() {
THREAD_POOL = new ThreadPool("ShardStateActionTest");
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
this.transport = new CapturingTransport();
clusterService = new TestClusterService(THREAD_POOL);
transportService = new TransportService(transport, THREAD_POOL);
transportService.start();
shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null);
}
@Override
@After
public void tearDown() throws Exception {
transportService.stop();
super.tearDown();
}
@AfterClass
public static void stopThreadPool() {
ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS);
THREAD_POOL = null;
}
public void testNoMaster() {
final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5)));
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(clusterService.state().nodes());
builder.masterNodeId(null);
clusterService.setState(ClusterState.builder(clusterService.state()).nodes(builder));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
AtomicBoolean noMaster = new AtomicBoolean();
assert !noMaster.get();
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onShardFailedNoMaster() {
noMaster.set(true);
}
@Override
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
}
});
assertTrue(noMaster.get());
}
public void testFailure() {
final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5)));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
AtomicBoolean failure = new AtomicBoolean();
assert !failure.get();
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onShardFailedNoMaster() {
}
@Override
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
failure.set(true);
}
});
final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests();
transport.clear();
assertThat(capturedRequests.length, equalTo(1));
assert !failure.get();
transport.handleResponse(capturedRequests[0].requestId, new TransportException("simulated"));
assertTrue(failure.get());
}
private ShardRouting getRandomShardRouting(String index) {
IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index);
ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt();
ShardRouting shardRouting = shardsIterator.nextOrNull();
assert shardRouting != null;
return shardRouting;
}
private Throwable getSimulatedFailure() {
return new CorruptIndexException("simulated", (String) null);
}
}