Introduced an index UUID which is added to the index's settings upon creation. Used that UUID to verify old and delayed shard started/failed events are not applied to newer indexes with the same name.

Also, exceptions while processing batched events do not stop the rest of the events from being processed.

Closes #3778
This commit is contained in:
Boaz Leskes 2013-09-25 17:05:11 +02:00
parent 35990f572b
commit 1644444a4f
8 changed files with 260 additions and 111 deletions

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -622,8 +623,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
replicaCounter++;
AtomicInteger counter = new AtomicInteger(replicaCounter);
IndexMetaData indexMetaData = clusterState.metaData().index(request.index());
if (newPrimaryShard != null) {
performOnReplica(response, counter, newPrimaryShard, newPrimaryShard.currentNodeId());
performOnReplica(response, counter, newPrimaryShard, newPrimaryShard.currentNodeId(), indexMetaData);
}
shardIt.reset(); // reset the iterator
@ -647,10 +651,10 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// yet that it was started. We will get an exception IllegalShardState exception if its not started
// and that's fine, we will ignore it
if (!doOnlyOnRelocating) {
performOnReplica(response, counter, shard, shard.currentNodeId());
performOnReplica(response, counter, shard, shard.currentNodeId(), indexMetaData);
}
if (shard.relocating()) {
performOnReplica(response, counter, shard, shard.relocatingNodeId());
performOnReplica(response, counter, shard, shard.relocatingNodeId(), indexMetaData);
}
}
@ -662,7 +666,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
void performOnReplica(final PrimaryResponse<Response, ReplicaRequest> response, final AtomicInteger counter, final ShardRouting shard, String nodeId) {
void performOnReplica(final PrimaryResponse<Response, ReplicaRequest> response, final AtomicInteger counter, final ShardRouting shard, String nodeId, final IndexMetaData indexMetaData) {
// if we don't have that node, it means that it might have failed and will be created again, in
// this case, we don't have to do the operation, and just let it failover
if (!clusterState.nodes().nodeExists(nodeId)) {
@ -685,7 +689,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
public void handleException(TransportException exp) {
if (!ignoreReplicaException(exp.unwrapCause())) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), exp);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(exp) + "]");
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(exp) + "]");
}
finishIfPossible();
}
@ -708,7 +713,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
}
if (counter.decrementAndGet() == 0) {
@ -725,7 +731,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
// we want to decrement the counter here, in teh failure handling, cause we got rejected
// from executing on the thread pool
@ -739,7 +746,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica" + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
}
if (counter.decrementAndGet() == 0) {

View File

@ -20,9 +20,12 @@
package org.elasticsearch.cluster.action.shard;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@ -58,8 +61,8 @@ public class ShardStateAction extends AbstractComponent {
private final AllocationService allocationService;
private final ThreadPool threadPool;
private final BlockingQueue<ShardRouting> startedShardsQueue = ConcurrentCollections.newBlockingQueue();
private final BlockingQueue<ShardRouting> failedShardQueue = ConcurrentCollections.newBlockingQueue();
private final BlockingQueue<ShardRoutingEntry> startedShardsQueue = ConcurrentCollections.newBlockingQueue();
private final BlockingQueue<ShardRoutingEntry> failedShardQueue = ConcurrentCollections.newBlockingQueue();
@Inject
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
@ -74,14 +77,15 @@ public class ShardStateAction extends AbstractComponent {
transportService.registerHandler(ShardFailedTransportHandler.ACTION, new ShardFailedTransportHandler());
}
public void shardFailed(final ShardRouting shardRouting, final String reason) throws ElasticSearchException {
logger.warn("sending failed shard for {}, reason [{}]", shardRouting, reason);
public void shardFailed(final ShardRouting shardRouting, String indexUUID, final String reason) throws ElasticSearchException {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);
logger.warn("{} sending failed shard for {}", shardRouting.shardId(), shardRoutingEntry);
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
innerShardFailed(shardRouting, reason);
innerShardFailed(shardRoutingEntry);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
ShardFailedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
ShardFailedTransportHandler.ACTION, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to [{}]", exp, clusterService.state().nodes().masterNode());
@ -90,16 +94,18 @@ public class ShardStateAction extends AbstractComponent {
}
}
public void shardStarted(final ShardRouting shardRouting, final String reason) throws ElasticSearchException {
if (logger.isDebugEnabled()) {
logger.debug("sending shard started for {}, reason [{}]", shardRouting, reason);
}
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) throws ElasticSearchException {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);
logger.debug("sending shard started for {}", shardRoutingEntry);
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
innerShardStarted(shardRouting, reason);
innerShardStarted(shardRoutingEntry);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
ShardStartedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
ShardStartedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, clusterService.state().nodes().masterNode());
@ -108,16 +114,43 @@ public class ShardStateAction extends AbstractComponent {
}
}
private void innerShardFailed(final ShardRouting shardRouting, final String reason) {
logger.warn("received shard failed for {}, reason [{}]", shardRouting, reason);
failedShardQueue.add(shardRouting);
clusterService.submitStateUpdateTask("shard-failed (" + shardRouting + "), reason [" + reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
private void innerShardFailed(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
failedShardQueue.add(shardRoutingEntry);
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
List<ShardRouting> shards = new ArrayList<ShardRouting>();
failedShardQueue.drainTo(shards);
RoutingAllocation.Result routingResult = allocationService.applyFailedShards(currentState, shards);
List<ShardRoutingEntry> shardRoutingEntries = new ArrayList<ShardRoutingEntry>();
failedShardQueue.drainTo(shardRoutingEntries);
// nothing to process (a previous event has processed it already)
if (shardRoutingEntries.isEmpty()) {
return currentState;
}
MetaData metaData = currentState.getMetaData();
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<ShardRouting>(shardRoutingEntries.size());
for (int i = 0; i < shardRoutingEntries.size(); i++) {
ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
ShardRouting shardRouting = shardRoutingEntry.shardRouting;
IndexMetaData indexMetaData = metaData.index(shardRouting.index());
// if there is no metadata or the current index is not of the right uuid, the index has been deleted while it was being allocated
// which is fine, we should just ignore this
if (indexMetaData == null) {
continue;
}
if (!indexMetaData.isSameUUID(shardRoutingEntry.indexUUID)) {
logger.debug("{} ignoring shard failed, different index uuid, current {}, got {}", shardRouting.shardId(), indexMetaData.getUUID(), shardRoutingEntry);
continue;
}
logger.debug("{} will apply shard failed {}", shardRouting.shardId(), shardRoutingEntry);
shardRoutingsToBeApplied.add(shardRouting);
}
RoutingAllocation.Result routingResult = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied);
if (!routingResult.changed()) {
return currentState;
}
@ -131,74 +164,93 @@ public class ShardStateAction extends AbstractComponent {
});
}
private void innerShardStarted(final ShardRouting shardRouting, final String reason) {
if (logger.isDebugEnabled()) {
logger.debug("received shard started for {}, reason [{}]", shardRouting, reason);
}
private void innerShardStarted(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("received shard started for {}", shardRoutingEntry);
// buffer shard started requests, and the state update tasks will simply drain it
// this is to optimize the number of "started" events we generate, and batch them
// possibly, we can do time based batching as well, but usually, we would want to
// process started events as fast as possible, to make shards available
startedShardsQueue.add(shardRouting);
startedShardsQueue.add(shardRoutingEntry);
clusterService.submitStateUpdateTask("shard-started (" + shardRouting + "), reason [" + reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
clusterService.submitStateUpdateTask("shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH,
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
List<ShardRouting> shards = new ArrayList<ShardRouting>();
startedShardsQueue.drainTo(shards);
List<ShardRoutingEntry> shardRoutingEntries = new ArrayList<ShardRoutingEntry>();
startedShardsQueue.drainTo(shardRoutingEntries);
// nothing to process (a previous event has process it already)
if (shards.isEmpty()) {
return currentState;
}
// nothing to process (a previous event has processed it already)
if (shardRoutingEntries.isEmpty()) {
return currentState;
}
RoutingTable routingTable = currentState.routingTable();
RoutingTable routingTable = currentState.routingTable();
MetaData metaData = currentState.getMetaData();
for (int i = 0; i < shards.size(); i++) {
ShardRouting shardRouting = shards.get(i);
IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index());
// if there is no routing table, the index has been deleted while it was being allocated
// which is fine, we should just ignore this
if (indexRoutingTable == null) {
shards.remove(i);
} else {
// find the one that maps to us, if its already started, no need to do anything...
// the shard might already be started since the nodes that is starting the shards might get cluster events
// with the shard still initializing, and it will try and start it again (until the verification comes)
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardRouting.id());
for (ShardRouting entry : indexShardRoutingTable) {
if (shardRouting.currentNodeId().equals(entry.currentNodeId())) {
// we found the same shard that exists on the same node id
if (entry.started()) {
// already started, do nothing here...
shards.remove(i);
List<ShardRouting> shardRoutingToBeApplied = new ArrayList<ShardRouting>(shardRoutingEntries.size());
for (int i = 0; i < shardRoutingEntries.size(); i++) {
ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
ShardRouting shardRouting = shardRoutingEntry.shardRouting;
try {
IndexMetaData indexMetaData = metaData.index(shardRouting.index());
IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index());
// if there is no metadata, no routing table or the current index is not of the right uuid, the index has been deleted while it was being allocated
// which is fine, we should just ignore this
if (indexMetaData == null) {
continue;
}
if (indexRoutingTable == null) {
continue;
}
if (!indexMetaData.isSameUUID(shardRoutingEntry.indexUUID)) {
logger.debug("{} ignoring shard started, different index uuid, current {}, got {}", shardRouting.shardId(), indexMetaData.getUUID(), shardRoutingEntry);
continue;
}
// find the one that maps to us, if its already started, no need to do anything...
// the shard might already be started since the nodes that is starting the shards might get cluster events
// with the shard still initializing, and it will try and start it again (until the verification comes)
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardRouting.id());
for (ShardRouting entry : indexShardRoutingTable) {
if (shardRouting.currentNodeId().equals(entry.currentNodeId())) {
// we found the same shard that exists on the same node id
if (entry.initializing()) {
// shard not started, add it to the shards to be processed.
shardRoutingToBeApplied.add(shardRouting);
logger.debug("{} will apply shard started {}", shardRouting.shardId(), shardRoutingEntry);
} else {
logger.debug("{} ignoring shard started event for {}, current state: {}", shardRouting.shardId(), shardRoutingEntry, entry.state());
}
} else {
shardRoutingToBeApplied.add(shardRouting);
logger.debug("{} will apply shard started {}", shardRouting.shardId(), shardRoutingEntry);
}
}
} catch (Throwable t) {
logger.error("{} unexpected failure while processing shard started [{}]", t, shardRouting.shardId(), shardRouting);
}
}
if (shardRoutingToBeApplied.isEmpty()) {
return currentState;
}
RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shardRoutingToBeApplied, true);
if (!routingResult.changed()) {
return currentState;
}
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}
}
if (shards.isEmpty()) {
return currentState;
}
if (logger.isDebugEnabled()) {
logger.debug("applying started shards {}, reason [{}]", shards, reason);
}
RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shards, true);
if (!routingResult.changed()) {
return currentState;
}
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
});
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
});
}
private class ShardFailedTransportHandler extends BaseTransportRequestHandler<ShardRoutingEntry> {
@ -212,7 +264,7 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
innerShardFailed(request.shardRouting, request.reason);
innerShardFailed(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@ -233,7 +285,7 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
innerShardStarted(request.shardRouting, request.reason);
innerShardStarted(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@ -247,14 +299,17 @@ public class ShardStateAction extends AbstractComponent {
private ShardRouting shardRouting;
private String indexUUID;
private String reason;
private ShardRoutingEntry() {
}
private ShardRoutingEntry(ShardRouting shardRouting, String reason) {
private ShardRoutingEntry(ShardRouting shardRouting, String indexUUID, String reason) {
this.shardRouting = shardRouting;
this.reason = reason;
this.indexUUID = indexUUID;
}
@Override
@ -262,6 +317,9 @@ public class ShardStateAction extends AbstractComponent {
super.readFrom(in);
shardRouting = readShardRoutingEntry(in);
reason = in.readString();
if (in.getVersion().onOrAfter(Version.V_0_90_6)) {
indexUUID = in.readOptionalString();
}
}
@Override
@ -269,6 +327,19 @@ public class ShardStateAction extends AbstractComponent {
super.writeTo(out);
shardRouting.writeTo(out);
out.writeString(reason);
if (out.getVersion().onOrAfter(Version.V_0_90_6)) {
out.writeOptionalString(indexUUID);
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(shardRouting.toString());
if (indexUUID != null) {
sb.append(", indexUUID [").append(indexUUID).append("]");
}
sb.append(", reason [").append(reason).append("]");
return sb.toString();
}
}
}

View File

@ -56,6 +56,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.*;
*/
public class IndexMetaData {
public interface Custom {
String type();
@ -155,6 +156,7 @@ public class IndexMetaData {
public static final String SETTING_BLOCKS_WRITE = "index.blocks.write";
public static final String SETTING_BLOCKS_METADATA = "index.blocks.metadata";
public static final String SETTING_VERSION_CREATED = "index.version.created";
public static final String SETTING_UUID = "index.uuid";
private final String index;
private final long version;
@ -212,6 +214,21 @@ public class IndexMetaData {
return index;
}
public String getUUID() {
return settings.get(SETTING_UUID);
}
/**
* Test whether the current index UUID is the same as the given one. Incoming nulls always return true.
*/
public boolean isSameUUID(@Nullable String otherUUID) {
if (otherUUID == null || getUUID() == null) {
return true;
}
return otherUUID.equals(getUUID());
}
public String getIndex() {
return index();
}
@ -330,16 +347,30 @@ public class IndexMetaData {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
IndexMetaData that = (IndexMetaData) o;
if (!aliases.equals(that.aliases)) return false;
if (!index.equals(that.index)) return false;
if (!mappings.equals(that.mappings)) return false;
if (!settings.equals(that.settings)) return false;
if (state != that.state) return false;
if (!aliases.equals(that.aliases)) {
return false;
}
if (!index.equals(that.index)) {
return false;
}
if (!mappings.equals(that.mappings)) {
return false;
}
if (!settings.equals(that.settings)) {
return false;
}
if (state != that.state) {
return false;
}
return true;
}

View File

@ -253,6 +253,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
}
indexSettingsBuilder.put(SETTING_VERSION_CREATED, version);
indexSettingsBuilder.put(SETTING_UUID, Strings.randomBase64UUID());
Settings actualIndexSettings = indexSettingsBuilder.build();

View File

@ -86,4 +86,6 @@ public interface IndexService extends IndexComponent, Iterable<IndexShard> {
Injector shardInjector(int shardId);
Injector shardInjectorSafe(int shardId) throws IndexShardMissingException;
String indexUUID();
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings;
@ -292,6 +293,11 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
return shardInjector;
}
@Override
public String indexUUID() {
return indexSettings.get(IndexMetaData.SETTING_UUID);
}
@Override
public synchronized IndexShard createShard(int sShardId) throws ElasticSearchException {
/*

View File

@ -174,6 +174,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
cleanFailedShards(event);
cleanMismatchedIndexUUIDs(event);
applyNewIndices(event);
applyMappings(event);
applyAliases(event);
@ -217,6 +218,20 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
private void cleanMismatchedIndexUUIDs(final ClusterChangedEvent event) {
for (IndexService indexService : indicesService) {
IndexMetaData indexMetaData = event.state().metaData().index(indexService.index().name());
if (indexMetaData == null) {
// got deleted on us, will be deleted later
continue;
}
if (!indexMetaData.isSameUUID(indexService.indexUUID())) {
logger.debug("[{}] mismatch on index UUIDs between cluster state and local state, cleaning the index so it will be recreated", indexMetaData.index());
removeIndex(indexMetaData.index(), "mismatch on index UUIDs between cluster state and local state, cleaning the index so it will be recreated");
}
}
}
private void applyCleanedIndices(final ClusterChangedEvent event) {
// handle closed indices, since they are not allocated on a node once they are closed
// so applyDeletedIndices might not take them into account
@ -506,6 +521,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// got deleted on us, ignore
continue;
}
final IndexMetaData indexMetaData = event.state().metaData().index(shardRouting.index());
if (indexMetaData == null) {
// the index got deleted on the metadata, we will clean it later in the apply deleted method call
continue;
}
final int shardId = shardRouting.id();
@ -514,7 +534,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// the master thinks we are started, but we don't have this shard at all, mark it as failed
logger.warn("[{}][{}] master [{}] marked shard as started, but shard has not been created, mark shard as failed", shardRouting.index(), shardId, nodes.masterNode());
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, "master " + nodes.masterNode() + " marked shard as started, but shard has not been created, mark shard as failed");
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(),
"master " + nodes.masterNode() + " marked shard as started, but shard has not been created, mark shard as failed");
}
continue;
}
@ -542,7 +563,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
if (shardRouting.initializing()) {
applyInitializingShard(routingTable, nodes, routingTable.index(shardRouting.index()).shard(shardRouting.id()), shardRouting);
applyInitializingShard(routingTable, nodes, indexMetaData, routingTable.index(shardRouting.index()).shard(shardRouting.id()), shardRouting);
}
}
}
@ -586,7 +607,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
private void applyInitializingShard(final RoutingTable routingTable, final DiscoveryNodes nodes, final IndexShardRoutingTable indexShardRouting, final ShardRouting shardRouting) throws ElasticSearchException {
private void applyInitializingShard(final RoutingTable routingTable, final DiscoveryNodes nodes, final IndexMetaData indexMetaData, final IndexShardRoutingTable indexShardRouting, final ShardRouting shardRouting) throws ElasticSearchException {
final IndexService indexService = indicesService.indexService(shardRouting.index());
if (indexService == null) {
// got deleted on us, ignore
@ -602,7 +623,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] master [{}] marked shard as initializing, but shard already created, mark shard as started");
}
shardStateAction.shardStarted(shardRouting, "master " + nodes.masterNode() + " marked shard as initializing, but shard already started, mark shard as started");
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(),
"master " + nodes.masterNode() + " marked shard as initializing, but shard already started, mark shard as started");
return;
} else {
if (indexShard.ignoreRecoveryAttempt()) {
@ -636,7 +658,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
logger.warn("[{}][{}] failed to remove shard after failed creation", e1, shardRouting.index(), shardRouting.id());
}
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, "Failed to create shard, message [" + detailedMessage(e) + "]");
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(), "Failed to create shard, message [" + detailedMessage(e) + "]");
return;
}
}
@ -659,9 +681,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
try {
// we are recovering a backup from a primary, so no need to mark it as relocated
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().list());
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService));
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService, indexMetaData));
} catch (Throwable e) {
handleRecoveryFailure(indexService, shardRouting, true, e);
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
break;
}
break;
@ -676,7 +698,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
shardGatewayService.recover(indexShouldExists, new IndexShardGatewayService.RecoveryListener() {
@Override
public void onRecoveryDone() {
shardStateAction.shardStarted(shardRouting, "after recovery from gateway");
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(), "after recovery from gateway");
}
@Override
@ -685,7 +707,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
@Override
public void onRecoveryFailed(IndexShardGatewayRecoveryException e) {
handleRecoveryFailure(indexService, shardRouting, true, e);
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
}
});
} else {
@ -695,9 +717,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// we don't mark this one as relocated at the end, requests in any case are routed to both when its relocating
// and that way we handle the edge case where its mark as relocated, and we might need to roll it back...
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().list());
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService));
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService, indexMetaData));
} catch (Throwable e) {
handleRecoveryFailure(indexService, shardRouting, true, e);
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
}
}
}
@ -706,20 +728,20 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private class PeerRecoveryListener implements RecoveryTarget.RecoveryListener {
private final StartRecoveryRequest request;
private final ShardRouting shardRouting;
private final IndexService indexService;
private final IndexMetaData indexMetaData;
private PeerRecoveryListener(StartRecoveryRequest request, ShardRouting shardRouting, IndexService indexService) {
private PeerRecoveryListener(StartRecoveryRequest request, ShardRouting shardRouting, IndexService indexService, IndexMetaData indexMetaData) {
this.request = request;
this.shardRouting = shardRouting;
this.indexService = indexService;
this.indexMetaData = indexMetaData;
}
@Override
public void onRecoveryDone() {
shardStateAction.shardStarted(shardRouting, "after recovery (replica) from node [" + request.sourceNode() + "]");
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(), "after recovery (replica) from node [" + request.sourceNode() + "]");
}
@Override
@ -750,11 +772,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
@Override
public void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure) {
handleRecoveryFailure(indexService, shardRouting, sendShardFailure, e);
handleRecoveryFailure(indexService, indexMetaData, shardRouting, sendShardFailure, e);
}
}
private void handleRecoveryFailure(IndexService indexService, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
private void handleRecoveryFailure(IndexService indexService, IndexMetaData indexMetaData, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
logger.warn("[{}][{}] failed to start shard", failure, indexService.index().name(), shardRouting.shardId().id());
synchronized (mutex) {
if (indexService.hasShard(shardRouting.shardId().id())) {
@ -769,7 +791,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (sendShardFailure) {
try {
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, "Failed to start shard, message [" + detailedMessage(failure) + "]");
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(), "Failed to start shard, message [" + detailedMessage(failure) + "]");
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed after a failed start", e1, indexService.index().name(), shardRouting.id());
}
@ -796,7 +818,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
public void onFailedEngine(final ShardId shardId, final Throwable failure) {
ShardRouting shardRouting = null;
final IndexService indexService = indicesService.indexService(shardId.index().name());
String indexUUID = null;
if (indexService != null) {
indexUUID = indexService.indexUUID();
IndexShard indexShard = indexService.shard(shardId.id());
if (indexShard != null) {
shardRouting = indexShard.routingEntry();
@ -807,6 +831,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
return;
}
final ShardRouting fShardRouting = shardRouting;
final String finalIndexUUID = indexUUID;
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
@ -822,7 +847,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
try {
failedShards.put(fShardRouting.shardId(), new FailedShard(fShardRouting.version()));
shardStateAction.shardFailed(fShardRouting, "engine failure, message [" + detailedMessage(failure) + "]");
shardStateAction.shardFailed(fShardRouting, finalIndexUUID, "engine failure, message [" + detailedMessage(failure) + "]");
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed after a failed engine", e1, indexService.index().name(), shardId.id());
}

View File

@ -45,6 +45,7 @@ import org.elasticsearch.index.engine.IndexEngine;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MapperTestUtils;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.query.IndexQueryParserService;
@ -53,7 +54,6 @@ import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.mapper.MapperTestUtils;
import org.junit.Test;
import java.io.IOException;
@ -388,6 +388,11 @@ public class SimpleIdCacheTests {
return null;
}
@Override
public String indexUUID() {
return null;
}
@Override
public Index index() {
return null;