From 70726b268fcc4ad0d9b78e8d520a4197b477caf9 Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 25 Feb 2010 18:40:29 +0200 Subject: [PATCH] add validation on routing table, expose it in cluster health, and add more information when shard start or fail --- .../cluster/health/ClusterHealthResponse.java | 40 ++++- .../cluster/health/ClusterIndexHealth.java | 24 ++- .../health/TransportClusterHealthAction.java | 44 +++-- .../indices/flush/TransportFlushAction.java | 2 +- .../optimize/TransportOptimizeAction.java | 2 +- .../refresh/TransportRefreshAction.java | 2 +- .../status/TransportIndicesStatusAction.java | 2 +- .../action/count/TransportCountAction.java | 2 +- .../action/get/TransportGetAction.java | 2 +- ...nsportShardReplicationOperationAction.java | 10 +- .../action/terms/TransportTermsAction.java | 2 +- .../TransportClientClusterModule.java | 4 +- .../elasticsearch/cluster/ClusterModule.java | 3 +- .../elasticsearch/cluster/ClusterState.java | 16 +- .../action/shard/ShardStateAction.java | 88 ++++++--- .../cluster/routing/IndexRoutingTable.java | 32 +++- .../cluster/routing/RoutingService.java | 4 + .../cluster/routing/RoutingTable.java | 22 +++ .../routing/RoutingTableValidation.java | 169 ++++++++++++++++++ .../routing/RoutingValidationException.java | 11 +- .../DefaultShardsRoutingStrategy.java | 6 +- .../InternalClusterService.java} | 32 +++- .../org/elasticsearch/index/IndexService.java | 2 +- .../index/InternalIndexService.java | 2 +- .../gateway/IndexShardGatewayService.java | 7 +- .../index/gateway/fs/FsIndexShardGateway.java | 4 +- .../gateway/none/NoneIndexShardGateway.java | 4 +- .../index/shard/IndexShardManagement.java | 1 + .../index/shard/IndexShardModule.java | 2 + .../index/shard/recovery/RecoveryAction.java | 10 +- .../index/shard/{ => service}/IndexShard.java | 5 +- .../{ => service}/InternalIndexShard.java | 11 +- .../index/translog/Translog.java | 2 +- .../indices/IndicesMemoryCleaner.java | 4 +- .../cluster/IndicesClusterStateService.java | 24 +-- .../health/RestClusterHealthAction.java | 33 ++++ .../cluster/state/RestClusterStateAction.java | 4 +- .../elasticsearch/search/SearchService.java | 2 +- .../util/component/AbstractComponent.java | 10 +- .../index/shard/SimpleIndexShardTests.java | 2 + .../IndexLifecycleActionTests.java | 42 ++--- .../testng/src/main/java/log4j.properties | 6 +- 42 files changed, 560 insertions(+), 136 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTableValidation.java rename modules/elasticsearch/src/main/java/org/elasticsearch/cluster/{DefaultClusterService.java => service/InternalClusterService.java} (82%) rename modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/{ => service}/IndexShard.java (92%) rename modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/{ => service}/InternalIndexShard.java (97%) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java index fef0102b85a..ef5cbaa02fc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.cluster.health; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import org.elasticsearch.action.ActionResponse; @@ -26,8 +27,10 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; +import java.util.List; import java.util.Map; +import static com.google.common.collect.Lists.*; import static org.elasticsearch.action.admin.cluster.health.ClusterIndexHealth.*; /** @@ -47,19 +50,41 @@ public class ClusterHealthResponse implements ActionResponse, Iterable validationFailures; + Map indices = Maps.newHashMap(); ClusterHealthResponse() { } - public ClusterHealthResponse(String clusterName) { + public ClusterHealthResponse(String clusterName, List validationFailures) { this.clusterName = clusterName; + this.validationFailures = validationFailures; } public String clusterName() { return clusterName; } + /** + * The validation failures on the cluster level (without index validation failures). + */ + public List validationFailures() { + return this.validationFailures; + } + + /** + * All the validation failures, including index level validation failures. + */ + public List allValidationFailures() { + List allFailures = newArrayList(validationFailures()); + for (ClusterIndexHealth indexHealth : indices.values()) { + allFailures.addAll(indexHealth.validationFailures()); + } + return allFailures; + } + + public int activeShards() { return activeShards; } @@ -103,6 +128,14 @@ public class ClusterHealthResponse implements ActionResponse, Iterable, Streama final Map shards = Maps.newHashMap(); + List validationFailures; + private ClusterIndexHealth() { } - public ClusterIndexHealth(String index, int numberOfShards, int numberOfReplicas) { + public ClusterIndexHealth(String index, int numberOfShards, int numberOfReplicas, List validationFailures) { this.index = index; this.numberOfShards = numberOfShards; this.numberOfReplicas = numberOfReplicas; + this.validationFailures = validationFailures; } public String index() { return index; } + public List validationFailures() { + return this.validationFailures; + } + public int numberOfShards() { return numberOfShards; } @@ -116,6 +125,14 @@ public class ClusterIndexHealth implements Iterable, Streama ClusterShardHealth shardHealth = readClusterShardHealth(in); shards.put(shardHealth.id(), shardHealth); } + size = in.readInt(); + if (size == 0) { + validationFailures = ImmutableList.of(); + } else { + for (int i = 0; i < size; i++) { + validationFailures.add(in.readUTF()); + } + } } @Override public void writeTo(DataOutput out) throws IOException { @@ -131,5 +148,10 @@ public class ClusterIndexHealth implements Iterable, Streama for (ClusterShardHealth shardHealth : this) { shardHealth.writeTo(out); } + + out.writeInt(validationFailures.size()); + for (String failure : validationFailures) { + out.writeUTF(failure); + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index cea3492d4f9..2e488836684 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTableValidation; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.timer.TimerService; @@ -105,8 +106,10 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc } private ClusterHealthResponse clusterHealth(ClusterHealthRequest request) { - ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value()); ClusterState clusterState = clusterService.state(); + RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData()); + ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), validation.failures()); + String[] indices = processIndices(clusterState, request.indices()); for (String index : indices) { IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index); @@ -114,7 +117,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc if (indexRoutingTable == null) { continue; } - ClusterIndexHealth indexHealth = new ClusterIndexHealth(index, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas()); + ClusterIndexHealth indexHealth = new ClusterIndexHealth(index, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), validation.indexFailures(indexMetaData.index())); for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { ClusterShardHealth shardHealth = new ClusterShardHealth(shardRoutingTable.shardId().id()); @@ -151,13 +154,17 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc } // update the index status indexHealth.status = ClusterHealthStatus.GREEN; - for (ClusterShardHealth shardHealth : indexHealth) { - if (shardHealth.status() == ClusterHealthStatus.RED) { - indexHealth.status = ClusterHealthStatus.RED; - break; - } - if (shardHealth.status() == ClusterHealthStatus.YELLOW) { - indexHealth.status = ClusterHealthStatus.YELLOW; + if (!indexHealth.validationFailures().isEmpty()) { + indexHealth.status = ClusterHealthStatus.RED; + } else { + for (ClusterShardHealth shardHealth : indexHealth) { + if (shardHealth.status() == ClusterHealthStatus.RED) { + indexHealth.status = ClusterHealthStatus.RED; + break; + } + if (shardHealth.status() == ClusterHealthStatus.YELLOW) { + indexHealth.status = ClusterHealthStatus.YELLOW; + } } } @@ -171,17 +178,20 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc } response.status = ClusterHealthStatus.GREEN; - for (ClusterIndexHealth indexHealth : response) { - if (indexHealth.status() == ClusterHealthStatus.RED) { - response.status = ClusterHealthStatus.RED; - break; - } - if (indexHealth.status() == ClusterHealthStatus.YELLOW) { - response.status = ClusterHealthStatus.YELLOW; + if (!response.validationFailures().isEmpty()) { + response.status = ClusterHealthStatus.RED; + } else { + for (ClusterIndexHealth indexHealth : response) { + if (indexHealth.status() == ClusterHealthStatus.RED) { + response.status = ClusterHealthStatus.RED; + break; + } + if (indexHealth.status() == ClusterHealthStatus.YELLOW) { + response.status = ClusterHealthStatus.YELLOW; + } } } - return response; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java index 29f4cbe4861..1dc941a6d59 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java @@ -31,7 +31,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java index 6be7388a954..12d690eed3e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java @@ -31,7 +31,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java index 533e5bacc71..7c87ee5caab 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java @@ -31,7 +31,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java index cf24d4e4386..65a07600306 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java @@ -32,7 +32,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.InternalIndexShard; +import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java index a433ccc58d7..1c23934cf5f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java @@ -30,7 +30,7 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 5748b76cac3..c880e599f61 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -24,7 +24,7 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.support.single.TransportSingleOperationAction; import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index c880ecb243e..c9a66bc7020 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -35,8 +35,8 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.shard.IllegalIndexShardStateException; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotStartedException; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; @@ -52,6 +52,8 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.elasticsearch.ExceptionsHelper.*; + /** * @author kimchy (Shay Banon) */ @@ -394,7 +396,7 @@ public abstract class TransportShardReplicationOperationActionNOTE, the routing nodes + * are immutable, use them just for read operations + */ + public RoutingNodes readOnlyRoutingNodes() { if (routingNodes != null) { return routingNodes; } @@ -132,11 +137,6 @@ public class ClusterState { return this; } - Builder incrementVersion() { - this.version++; - return this; - } - public ClusterState build() { return new ClusterState(version, metaData, routingTable, nodes); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 8a54e239706..5ae4957399b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -25,7 +25,10 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.node.Nodes; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BaseTransportRequestHandler; @@ -33,11 +36,17 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.VoidTransportResponseHandler; import org.elasticsearch.util.component.AbstractComponent; +import org.elasticsearch.util.io.Streamable; import org.elasticsearch.util.io.VoidStreamable; import org.elasticsearch.util.settings.Settings; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import static com.google.common.collect.Lists.*; import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.routing.ImmutableShardRouting.*; /** * @author kimchy (Shay Banon) @@ -64,41 +73,41 @@ public class ShardStateAction extends AbstractComponent { transportService.registerHandler(ShardFailedTransportHandler.ACTION, new ShardFailedTransportHandler()); } - public void shardFailed(final ShardRouting shardRouting) throws ElasticSearchException { - logger.warn("Sending failed shard for {}", shardRouting); + public void shardFailed(final ShardRouting shardRouting, final String reason) throws ElasticSearchException { + logger.warn("Sending failed shard for {}, reason [{}]", shardRouting, reason); Nodes nodes = clusterService.state().nodes(); if (nodes.localNodeMaster()) { threadPool.execute(new Runnable() { @Override public void run() { - innerShardFailed(shardRouting); + innerShardFailed(shardRouting, reason); } }); } else { transportService.sendRequest(clusterService.state().nodes().masterNode(), - ShardFailedTransportHandler.ACTION, shardRouting, VoidTransportResponseHandler.INSTANCE); + ShardFailedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), VoidTransportResponseHandler.INSTANCE); } } - public void shardStarted(final ShardRouting shardRouting) throws ElasticSearchException { + public void shardStarted(final ShardRouting shardRouting, final String reason) throws ElasticSearchException { if (logger.isDebugEnabled()) { - logger.debug("Sending shard started for {}", shardRouting); + logger.debug("Sending shard started for {}, reason [{}]", shardRouting, reason); } Nodes nodes = clusterService.state().nodes(); if (nodes.localNodeMaster()) { threadPool.execute(new Runnable() { @Override public void run() { - innerShardStarted(shardRouting); + innerShardStarted(shardRouting, reason); } }); } else { transportService.sendRequest(clusterService.state().nodes().masterNode(), - ShardStartedTransportHandler.ACTION, shardRouting, VoidTransportResponseHandler.INSTANCE); + ShardStartedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), VoidTransportResponseHandler.INSTANCE); } } - private void innerShardFailed(final ShardRouting shardRouting) { - logger.warn("Received shard failed for {}", shardRouting); - clusterService.submitStateUpdateTask("shard-failed (" + shardRouting + ")", new ClusterStateUpdateTask() { + private void innerShardFailed(final ShardRouting shardRouting, final String reason) { + logger.warn("Received shard failed for {}, reason [{}]", shardRouting, reason); + clusterService.submitStateUpdateTask("shard-failed (" + shardRouting + "), reason [" + reason + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { RoutingTable routingTable = currentState.routingTable(); IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index()); @@ -108,7 +117,7 @@ public class ShardStateAction extends AbstractComponent { return currentState; } if (logger.isDebugEnabled()) { - logger.debug("Applying failed shard {}", shardRouting); + logger.debug("Applying failed shard {}, reason [{}]", shardRouting, reason); } RoutingTable prevRoutingTable = currentState.routingTable(); RoutingTable newRoutingTable = shardsRoutingStrategy.applyFailedShards(currentState, newArrayList(shardRouting)); @@ -120,11 +129,11 @@ public class ShardStateAction extends AbstractComponent { }); } - private void innerShardStarted(final ShardRouting shardRouting) { + private void innerShardStarted(final ShardRouting shardRouting, final String reason) { if (logger.isDebugEnabled()) { - logger.debug("Received shard started for {}", shardRouting); + logger.debug("Received shard started for {}, reason [{}]", shardRouting, reason); } - clusterService.submitStateUpdateTask("shard-started (" + shardRouting + ")", new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("shard-started (" + shardRouting + "), reason [" + reason + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { RoutingTable routingTable = currentState.routingTable(); IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index()); @@ -147,7 +156,7 @@ public class ShardStateAction extends AbstractComponent { } } if (logger.isDebugEnabled()) { - logger.debug("Applying started shard {}", shardRouting); + logger.debug("Applying started shard {}, reason [{}]", shardRouting, reason); } RoutingTable newRoutingTable = shardsRoutingStrategy.applyStartedShards(currentState, newArrayList(shardRouting)); if (routingTable == newRoutingTable) { @@ -158,31 +167,56 @@ public class ShardStateAction extends AbstractComponent { }); } - private class ShardFailedTransportHandler extends BaseTransportRequestHandler { + private class ShardFailedTransportHandler extends BaseTransportRequestHandler { static final String ACTION = "cluster/shardFailure"; - @Override public ShardRouting newInstance() { - return new ImmutableShardRouting(); + @Override public ShardRoutingEntry newInstance() { + return new ShardRoutingEntry(); } - @Override public void messageReceived(ShardRouting request, TransportChannel channel) throws Exception { - innerShardFailed(request); + @Override public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { + innerShardFailed(request.shardRouting, request.reason); channel.sendResponse(VoidStreamable.INSTANCE); } } - private class ShardStartedTransportHandler extends BaseTransportRequestHandler { + private class ShardStartedTransportHandler extends BaseTransportRequestHandler { static final String ACTION = "cluster/shardStarted"; - @Override public ShardRouting newInstance() { - return new ImmutableShardRouting(); + @Override public ShardRoutingEntry newInstance() { + return new ShardRoutingEntry(); } - @Override public void messageReceived(ShardRouting request, TransportChannel channel) throws Exception { - innerShardStarted(request); + @Override public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { + innerShardStarted(request.shardRouting, request.reason); channel.sendResponse(VoidStreamable.INSTANCE); } } + + private static class ShardRoutingEntry implements Streamable { + + private ShardRouting shardRouting; + + private String reason; + + private ShardRoutingEntry() { + } + + private ShardRoutingEntry(ShardRouting shardRouting, String reason) { + this.shardRouting = shardRouting; + this.reason = reason; + } + + @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { + shardRouting = readShardRoutingEntry(in); + reason = in.readUTF(); + } + + @Override public void writeTo(DataOutput out) throws IOException { + shardRouting.writeTo(out); + out.writeUTF(reason); + } + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index e8ffb4591c0..f41382e9fde 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -20,8 +20,10 @@ package org.elasticsearch.cluster.routing; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; import com.google.common.collect.UnmodifiableIterator; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.util.IdentityHashSet; import org.elasticsearch.util.concurrent.Immutable; @@ -30,6 +32,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Set; /** * @author kimchy (Shay Banon) @@ -52,6 +55,33 @@ public class IndexRoutingTable implements Iterable { return this.index; } + public void validate(RoutingTableValidation validation, MetaData metaData) { + if (!metaData.hasIndex(index())) { + validation.addIndexFailure(index(), "Exists in routing does not exists in metadata"); + return; + } + IndexMetaData indexMetaData = metaData.index(index()); + // check the number of shards + if (indexMetaData.numberOfShards() != shards().size()) { + Set expected = Sets.newHashSet(); + for (int i = 0; i < indexMetaData.numberOfShards(); i++) { + expected.add(i); + } + for (IndexShardRoutingTable indexShardRoutingTable : this) { + expected.remove(indexShardRoutingTable.shardId().id()); + } + validation.addIndexFailure(index(), "Wrong number of shards in routing table, missing: " + expected); + } + // check the replicas + for (IndexShardRoutingTable indexShardRoutingTable : this) { + int routingNumberOfReplicas = indexShardRoutingTable.size() - 1; + if (routingNumberOfReplicas != indexMetaData.numberOfReplicas()) { + validation.addIndexFailure(index(), "Shard [" + indexShardRoutingTable.shardId().id() + + "] routing table has wrong number of replicas, expected [" + indexMetaData.numberOfReplicas() + "], got [" + routingNumberOfReplicas + "]"); + } + } + } + @Override public UnmodifiableIterator iterator() { return shards.values().iterator(); } @@ -174,7 +204,7 @@ public class IndexRoutingTable implements Iterable { public String prettyPrint() { StringBuilder sb = new StringBuilder("-- Index[" + index + "]\n"); for (IndexShardRoutingTable indexShard : this) { - sb.append("----ShardId[").append(indexShard.shardId()).append("]\n"); + sb.append("----ShardId[").append(indexShard.shardId().index().name()).append("][").append(indexShard.shardId().id()).append("]\n"); for (ShardRouting shard : indexShard) { sb.append("--------").append(shard.shortSummary()).append("\n"); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 1f507c04ef1..feb2364562b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -140,6 +140,10 @@ public class RoutingService extends AbstractComponent implements ClusterStateLis clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { RoutingTable newRoutingTable = shardsRoutingStrategy.reroute(currentState); + if (newRoutingTable == currentState.routingTable()) { + // no state changed + return currentState; + } return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build(); } }); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index f30c25d5548..cde81aadd09 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.UnmodifiableIterator; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndexMissingException; @@ -70,6 +71,27 @@ public class RoutingTable implements Iterable { return new RoutingNodes(metaData, this); } + public RoutingTable validateRaiseException(MetaData metaData) throws RoutingValidationException { + RoutingTableValidation validation = validate(metaData); + if (!validation.valid()) { + throw new RoutingValidationException(validation); + } + return this; + } + + public RoutingTableValidation validate(MetaData metaData) { + RoutingTableValidation validation = new RoutingTableValidation(); + for (IndexMetaData indexMetaData : metaData) { + if (!indicesRouting.containsKey(indexMetaData.index())) { + validation.addIndexFailure(indexMetaData.index(), "Exists in metadata and does not exists in routing table"); + } + } + for (IndexRoutingTable indexRoutingTable : this) { + indexRoutingTable.validate(validation, metaData); + } + return validation; + } + /** * All the shards (replicas) for the provided indices. * diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTableValidation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTableValidation.java new file mode 100644 index 00000000000..2f9e9da7dea --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTableValidation.java @@ -0,0 +1,169 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.elasticsearch.util.io.Streamable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.Lists.*; +import static com.google.common.collect.Maps.*; + +/** + * @author kimchy (shay.banon) + */ +public class RoutingTableValidation implements Serializable, Streamable { + + private boolean valid = true; + + private List failures; + + private Map> indicesFailures; + + public RoutingTableValidation() { + } + + public boolean valid() { + return valid; + } + + public List allFailures() { + if (failures().isEmpty() && indicesFailures().isEmpty()) { + return ImmutableList.of(); + } + List allFailures = newArrayList(failures()); + for (Map.Entry> entry : indicesFailures().entrySet()) { + for (String failure : entry.getValue()) { + allFailures.add("Index [" + entry.getKey() + "]: " + failure); + } + } + return allFailures; + } + + public List failures() { + if (failures == null) { + return ImmutableList.of(); + } + return failures; + } + + public Map> indicesFailures() { + if (indicesFailures == null) { + return ImmutableMap.of(); + } + return indicesFailures; + } + + public List indexFailures(String index) { + if (indicesFailures == null) { + return ImmutableList.of(); + } + List indexFailures = indicesFailures.get(index); + if (indexFailures == null) { + return ImmutableList.of(); + } + return indexFailures; + } + + public void addFailure(String failure) { + valid = false; + if (failures == null) { + failures = newArrayList(); + } + failures.add(failure); + } + + public void addIndexFailure(String index, String failure) { + valid = false; + if (indicesFailures == null) { + indicesFailures = newHashMap(); + } + List indexFailures = indicesFailures.get(index); + if (indexFailures == null) { + indexFailures = Lists.newArrayList(); + indicesFailures.put(index, indexFailures); + } + indexFailures.add(failure); + } + + @Override public String toString() { + return allFailures().toString(); + } + + @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { + valid = in.readBoolean(); + int size = in.readInt(); + if (size == 0) { + failures = ImmutableList.of(); + } else { + failures = Lists.newArrayListWithCapacity(size); + for (int i = 0; i < size; i++) { + failures.add(in.readUTF()); + } + } + size = in.readInt(); + if (size == 0) { + indicesFailures = ImmutableMap.of(); + } else { + indicesFailures = newHashMap(); + for (int i = 0; i < size; i++) { + String index = in.readUTF(); + int size2 = in.readInt(); + List indexFailures = newArrayListWithCapacity(size2); + for (int j = 0; j < size2; j++) { + indexFailures.add(in.readUTF()); + } + indicesFailures.put(index, indexFailures); + } + } + } + + @Override public void writeTo(DataOutput out) throws IOException { + out.writeBoolean(valid); + if (failures == null) { + out.writeInt(0); + } else { + out.writeInt(failures.size()); + for (String failure : failures) { + out.writeUTF(failure); + } + } + if (indicesFailures == null) { + out.writeInt(0); + } else { + out.writeInt(indicesFailures.size()); + for (Map.Entry> entry : indicesFailures.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeInt(entry.getValue().size()); + for (String failure : entry.getValue()) { + out.writeUTF(failure); + } + } + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingValidationException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingValidationException.java index 328e10328cd..5f9c57c2970 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingValidationException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingValidationException.java @@ -24,11 +24,14 @@ package org.elasticsearch.cluster.routing; */ public class RoutingValidationException extends RoutingException { - public RoutingValidationException(String message) { - super(message); + private final RoutingTableValidation validation; + + public RoutingValidationException(RoutingTableValidation validation) { + super(validation.toString()); + this.validation = validation; } - public RoutingValidationException(String message, Throwable cause) { - super(message, cause); + public RoutingTableValidation validation() { + return this.validation; } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/DefaultShardsRoutingStrategy.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/DefaultShardsRoutingStrategy.java index c231aafe707..c2241742191 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/DefaultShardsRoutingStrategy.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/DefaultShardsRoutingStrategy.java @@ -40,7 +40,7 @@ public class DefaultShardsRoutingStrategy implements ShardsRoutingStrategy { if (!applyStartedShards(routingNodes, startedShardEntries)) { return clusterState.routingTable(); } - return new RoutingTable.Builder().updateNodes(routingNodes).build(); + return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()); } @Override public RoutingTable applyFailedShards(ClusterState clusterState, Iterable failedShardEntries) { @@ -48,7 +48,7 @@ public class DefaultShardsRoutingStrategy implements ShardsRoutingStrategy { if (!applyFailedShards(routingNodes, failedShardEntries)) { return clusterState.routingTable(); } - return new RoutingTable.Builder().updateNodes(routingNodes).build(); + return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()); } @Override public RoutingTable reroute(ClusterState clusterState) { @@ -78,7 +78,7 @@ public class DefaultShardsRoutingStrategy implements ShardsRoutingStrategy { return clusterState.routingTable(); } - return new RoutingTable.Builder().updateNodes(routingNodes).build(); + return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()); } private boolean rebalance(RoutingNodes routingNodes) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/DefaultClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java similarity index 82% rename from modules/elasticsearch/src/main/java/org/elasticsearch/cluster/DefaultClusterService.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index bbfe0d280a6..532914f20e0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/DefaultClusterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -17,10 +17,11 @@ * under the License. */ -package org.elasticsearch.cluster; +package org.elasticsearch.cluster.service; import com.google.inject.Inject; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.node.Nodes; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.threadpool.ThreadPool; @@ -44,7 +45,7 @@ import static org.elasticsearch.util.concurrent.DynamicExecutors.*; /** * @author kimchy (Shay Banon) */ -public class DefaultClusterService extends AbstractComponent implements ClusterService { +public class InternalClusterService extends AbstractComponent implements ClusterService { private final Lifecycle lifecycle = new Lifecycle(); @@ -66,7 +67,7 @@ public class DefaultClusterService extends AbstractComponent implements ClusterS private volatile ClusterState clusterState = newClusterStateBuilder().build(); - @Inject public DefaultClusterService(Settings settings, DiscoveryService discoveryService, TransportService transportService, ThreadPool threadPool) { + @Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, TransportService transportService, ThreadPool threadPool) { super(settings); this.transportService = transportService; this.discoveryService = discoveryService; @@ -90,7 +91,7 @@ public class DefaultClusterService extends AbstractComponent implements ClusterS for (final TimeoutHolder holder : clusterStateTimeoutListeners) { if ((timestamp - holder.timestamp) > holder.timeout.millis()) { clusterStateTimeoutListeners.remove(holder); - DefaultClusterService.this.threadPool.execute(new Runnable() { + InternalClusterService.this.threadPool.execute(new Runnable() { @Override public void run() { holder.listener.onTimeout(holder.timeout); } @@ -158,18 +159,33 @@ public class DefaultClusterService extends AbstractComponent implements ClusterS return; } ClusterState previousClusterState = clusterState; - clusterState = updateTask.execute(previousClusterState); + try { + clusterState = updateTask.execute(previousClusterState); + } catch (Exception e) { + StringBuilder sb = new StringBuilder("Failed to execute cluster state update, state:\nVersion [").append(clusterState.version()).append("], source [").append(source).append("]\n"); + sb.append(clusterState.nodes().prettyPrint()); + sb.append(clusterState.routingTable().prettyPrint()); + sb.append(clusterState.readOnlyRoutingNodes().prettyPrint()); + logger.warn(sb.toString(), e); + return; + } if (previousClusterState != clusterState) { if (clusterState.nodes().localNodeMaster()) { // only the master controls the version numbers - clusterState = newClusterStateBuilder().state(clusterState).incrementVersion().build(); + clusterState = new ClusterState(clusterState.version() + 1, clusterState.metaData(), clusterState.routingTable(), clusterState.nodes()); + } else { + // we got this cluster state from the master, filter out based on versions (don't call listeners) + if (clusterState.version() < previousClusterState.version()) { + logger.info("Got old cluster state [" + clusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring"); + return; + } } if (logger.isTraceEnabled()) { - StringBuilder sb = new StringBuilder("Cluster State updated, version [").append(clusterState.version()).append("], source [").append(source).append("]\n"); + StringBuilder sb = new StringBuilder("Cluster State updated:\nVersion [").append(clusterState.version()).append("], source [").append(source).append("]\n"); sb.append(clusterState.nodes().prettyPrint()); sb.append(clusterState.routingTable().prettyPrint()); - sb.append(clusterState.routingNodes().prettyPrint()); + sb.append(clusterState.readOnlyRoutingNodes().prettyPrint()); logger.trace(sb.toString()); } else if (logger.isDebugEnabled()) { logger.debug("Cluster state updated, version [{}], source [{}]", clusterState.version(), source); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/IndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/IndexService.java index b2d65639bf6..7cc2d6ff279 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/IndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/IndexService.java @@ -25,7 +25,7 @@ import org.elasticsearch.index.cache.filter.FilterCache; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.routing.OperationRouting; -import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.similarity.SimilarityService; import java.util.Set; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/InternalIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/InternalIndexService.java index e75524f681b..185f7007fcc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/InternalIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/InternalIndexService.java @@ -37,11 +37,11 @@ import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.routing.OperationRouting; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardManagement; import org.elasticsearch.index.shard.IndexShardModule; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.recovery.RecoveryAction; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreModule; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 0e521969e62..3fed539010d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -25,7 +25,12 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.*; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.threadpool.ThreadPool; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java index 23ce25f9b05..63712ef2231 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java @@ -31,9 +31,9 @@ import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException; import org.elasticsearch.index.gateway.RecoveryStatus; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.InternalIndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.threadpool.ThreadPool; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java index ce195865608..1273fe1e1bd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java @@ -26,9 +26,9 @@ import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; import org.elasticsearch.index.gateway.RecoveryStatus; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.InternalIndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.util.SizeUnit; import org.elasticsearch.util.SizeValue; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardManagement.java index 608119be6a2..085260224e0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardManagement.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardManagement.java @@ -22,6 +22,7 @@ package org.elasticsearch.index.shard; import com.google.inject.Inject; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.jmx.JmxService; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java index 02354364432..6c5b96c1836 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java @@ -21,6 +21,8 @@ package org.elasticsearch.index.shard; import com.google.inject.AbstractModule; import org.elasticsearch.index.shard.recovery.RecoveryAction; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.index.shard.service.InternalIndexShard; /** * @author kimchy (Shay Banon) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java index 117db078d75..1c0d6517d48 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java @@ -29,6 +29,8 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.*; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.memory.MemorySnapshot; @@ -349,7 +351,13 @@ public class RecoveryAction extends AbstractIndexShardComponent { sendSnapshot(snapshot, true); if (startRecoveryRequest.markAsRelocated) { // TODO what happens if the recovery process fails afterwards, we need to mark this back to started - indexShard.relocated(); + try { + indexShard.relocated(); + } catch (IllegalIndexShardStateException e) { + // we can ignore this exception since, on the other node, when it moved to phase3 + // it will also send shard started, which might cause the index shard we work against + // to move be closed by the time we get to the the relocated method + } } stopWatch.stop(); logger.trace("Recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java similarity index 92% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShard.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java index f82240d308d..f047dc31243 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.shard; +package org.elasticsearch.index.shard.service; import org.apache.lucene.index.Term; import org.elasticsearch.ElasticSearchException; @@ -25,6 +25,9 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.shard.IndexShardComponent; +import org.elasticsearch.index.shard.IndexShardLifecycle; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.util.Nullable; import org.elasticsearch.util.SizeValue; import org.elasticsearch.util.concurrent.ThreadSafe; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java similarity index 97% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/InternalIndexShard.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index e839e8716b2..c26260b1963 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.shard; +package org.elasticsearch.index.shard.service; import com.google.inject.Inject; import org.apache.lucene.document.Document; @@ -39,6 +39,7 @@ import org.elasticsearch.index.query.IndexQueryParser; import org.elasticsearch.index.query.IndexQueryParserMissingException; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.*; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.TypeMissingException; @@ -95,6 +96,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I this.queryParserService = queryParserService; this.filterCache = filterCache; state = IndexShardState.CREATED; + logger.debug("Moved to state [CREATED]"); } public Store store() { @@ -143,6 +145,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I throw new IndexShardRecoveringException(shardId); } state = IndexShardState.RECOVERING; + logger.debug("Moved to state [RECOVERING]"); return returnValue; } } @@ -152,6 +155,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I if (this.state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } + logger.debug("Restored to state [{}] from state [{}]", stateToRestore, state); this.state = stateToRestore; } return this; @@ -162,6 +166,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I if (state != IndexShardState.STARTED) { throw new IndexShardNotStartedException(shardId, state); } + logger.debug("Moved to state [RELOCATED]"); state = IndexShardState.RELOCATED; } return this; @@ -180,6 +185,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } engine.start(); scheduleRefresherIfNeeded(); + logger.debug("Moved to state [STARTED]"); state = IndexShardState.STARTED; } return this; @@ -379,6 +385,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I refreshScheduledFuture = null; } } + logger.debug("Moved to state [CLOSED]"); state = IndexShardState.CLOSED; } } @@ -390,6 +397,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I engine.start(); applyTranslogOperations(operations); synchronized (mutex) { + logger.debug("Moved to state [STARTED] post recovery (from gateway)"); state = IndexShardState.STARTED; } scheduleRefresherIfNeeded(); @@ -406,6 +414,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I applyTranslogOperations(snapshot); if (phase3) { synchronized (mutex) { + logger.debug("Moved to state [STARTED] post recovery (from another shard)"); state = IndexShardState.STARTED; } scheduleRefresherIfNeeded(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java index 9a5441ead96..d329ce29b7f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -22,8 +22,8 @@ package org.elasticsearch.index.translog; import org.apache.lucene.index.Term; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardComponent; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.util.Nullable; import org.elasticsearch.util.SizeValue; import org.elasticsearch.util.Strings; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesMemoryCleaner.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesMemoryCleaner.java index 6c7b7674011..726e87762c5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesMemoryCleaner.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesMemoryCleaner.java @@ -24,9 +24,9 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.FlushNotAllowedEngineException; import org.elasticsearch.index.shard.IllegalIndexShardStateException; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.index.shard.InternalIndexShard; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.util.SizeUnit; import org.elasticsearch.util.SizeValue; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index c1a19ab23d6..212d69ccd31 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -43,11 +43,11 @@ import org.elasticsearch.index.gateway.IgnoreGatewayRecoveryException; import org.elasticsearch.index.gateway.IndexShardGatewayService; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.index.shard.InternalIndexShard; import org.elasticsearch.index.shard.recovery.IgnoreRecoveryException; import org.elasticsearch.index.shard.recovery.RecoveryAction; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.util.component.AbstractComponent; @@ -59,6 +59,7 @@ import java.util.Map; import java.util.Set; import static com.google.common.collect.Sets.*; +import static org.elasticsearch.ExceptionsHelper.*; /** * @author kimchy (Shay Banon) @@ -146,7 +147,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu RoutingTable routingTable = event.state().routingTable(); - RoutingNode routingNodes = event.state().routingNodes().nodesToShards().get(event.state().nodes().localNodeId()); + RoutingNode routingNodes = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId()); if (routingNodes != null) { applyShards(routingNodes, routingTable, event.state().nodes()); } @@ -238,7 +239,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu if (!indexService.hasShard(shardId) && shardRouting.started()) { // the master thinks we are started, but we don't have this shard at all, mark it as failed logger.warn("[" + shardRouting.index() + "][" + shardRouting.shardId().id() + "] Master " + nodes.masterNode() + " marked shard as started, but shard have not been created, mark shard as failed"); - shardStateAction.shardFailed(shardRouting); + shardStateAction.shardFailed(shardRouting, "Master " + nodes.masterNode() + " marked shard as started, but shard have not been created, mark shard as failed"); continue; } @@ -268,7 +269,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu if (logger.isTraceEnabled()) { logger.trace("[" + shardRouting.index() + "][" + shardRouting.shardId().id() + "] Master " + nodes.masterNode() + " marked shard as initializing, but shard already started, mark shard as started"); } - shardStateAction.shardStarted(shardRouting); + shardStateAction.shardStarted(shardRouting, "Master " + nodes.masterNode() + " marked shard as initializing, but shard already started, mark shard as started"); return; } else { if (indexShard.ignoreRecoveryAttempt()) { @@ -293,7 +294,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu } catch (Exception e1) { logger.warn("Failed to delete shard after failed creation for index [" + indexService.index().name() + "] and shard id [" + shardRouting.id() + "]", e1); } - shardStateAction.shardFailed(shardRouting); + shardStateAction.shardFailed(shardRouting, "Failed to create shard, message [" + detailedMessage(e) + "]"); return; } } @@ -323,7 +324,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu try { // we are recovering a backup from a primary, so no need to mark it as relocated recoveryAction.startRecovery(nodes.localNode(), node, false); - shardStateAction.shardStarted(shardRouting); + shardStateAction.shardStarted(shardRouting, "after recovery (backup) from node [" + node + "]"); } catch (IgnoreRecoveryException e) { // that's fine, since we might be called concurrently, just ignore this break; @@ -337,7 +338,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu IndexShardGatewayService shardGatewayService = indexService.shardInjector(shardId).getInstance(IndexShardGatewayService.class); try { shardGatewayService.recover(); - shardStateAction.shardStarted(shardRouting); + shardStateAction.shardStarted(shardRouting, "after recovery from gateway"); } catch (IgnoreGatewayRecoveryException e) { // that's fine, we might be called concurrently, just ignore this, we already recovered } @@ -345,9 +346,10 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu // relocating primaries, recovery from the relocating shard Node node = nodes.get(shardRouting.relocatingNodeId()); try { - // we mark the primary we are going to recover from as relocated + // we mark the primary we are going to recover from as relocated at the end of phase 3 + // so operations will start moving to the new primary recoveryAction.startRecovery(nodes.localNode(), node, true); - shardStateAction.shardStarted(shardRouting); + shardStateAction.shardStarted(shardRouting, "after recovery (primary) from node [" + node + "]"); } catch (IgnoreRecoveryException e) { // that's fine, since we might be called concurrently, just ignore this, we are already recovering } @@ -363,7 +365,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu } } try { - shardStateAction.shardFailed(shardRouting); + shardStateAction.shardFailed(shardRouting, "Failed to start shard, message [" + detailedMessage(e) + "]"); } catch (Exception e1) { logger.warn("Failed to mark shard as failed after a failed start for index [" + indexService.index().name() + "] and shard id [" + shardRouting.id() + "]", e); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/health/RestClusterHealthAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/health/RestClusterHealthAction.java index 3c6d710a190..fa16bc23e3d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/health/RestClusterHealthAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/health/RestClusterHealthAction.java @@ -88,6 +88,31 @@ public class RestClusterHealthAction extends BaseRestHandler { builder.field("activeShards", response.activeShards()); builder.field("relocatingShards", response.relocatingShards()); + if (!response.validationFailures().isEmpty()) { + builder.startArray("validationFailures"); + for (String validationFailure : response.validationFailures()) { + builder.string(validationFailure); + } + // if we don't print index level information, still print the index validation failures + // so we know why the status is red + if (fLevel == 0) { + for (ClusterIndexHealth indexHealth : response) { + builder.startObject(indexHealth.index()); + + if (!indexHealth.validationFailures().isEmpty()) { + builder.startArray("validationFailures"); + for (String validationFailure : indexHealth.validationFailures()) { + builder.string(validationFailure); + } + builder.endArray(); + } + + builder.endObject(); + } + } + builder.endArray(); + } + if (fLevel > 0) { builder.startObject("indices"); for (ClusterIndexHealth indexHealth : response) { @@ -100,6 +125,14 @@ public class RestClusterHealthAction extends BaseRestHandler { builder.field("activeShards", indexHealth.activeShards()); builder.field("relocatingShards", indexHealth.relocatingShards()); + if (!indexHealth.validationFailures().isEmpty()) { + builder.startArray("validationFailures"); + for (String validationFailure : indexHealth.validationFailures()) { + builder.string(validationFailure); + } + builder.endArray(); + } + if (fLevel > 1) { builder.startObject("shards"); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/state/RestClusterStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/state/RestClusterStateAction.java index 7f1e230c9b2..2c73059386f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/state/RestClusterStateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/state/RestClusterStateAction.java @@ -103,12 +103,12 @@ public class RestClusterStateAction extends BaseRestHandler { // routing nodes builder.startObject("routingNodes"); builder.startArray("unassigned"); - for (ShardRouting shardRouting : state.routingNodes().unassigned()) { + for (ShardRouting shardRouting : state.readOnlyRoutingNodes().unassigned()) { jsonShardRouting(builder, shardRouting); } builder.endArray(); builder.startObject("nodes"); - for (RoutingNode routingNode : state.routingNodes()) { + for (RoutingNode routingNode : state.readOnlyRoutingNodes()) { builder.startArray(routingNode.nodeId()); for (ShardRouting shardRouting : routingNode) { jsonShardRouting(builder, shardRouting); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java index 1aa453486bf..f9dc0e0177d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java @@ -29,7 +29,7 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.search.dfs.CachedDfSource; import org.elasticsearch.search.dfs.DfsPhase; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/component/AbstractComponent.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/component/AbstractComponent.java index 07d6da32ffb..baa0f3ddd4b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/component/AbstractComponent.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/component/AbstractComponent.java @@ -40,8 +40,14 @@ public class AbstractComponent { this.componentSettings = settings.getComponentSettings(getClass()); } - public AbstractComponent(Settings settings, Class componentClass) { - this.logger = Loggers.getLogger(componentClass, settings); + public AbstractComponent(Settings settings, Class customClass) { + this.logger = Loggers.getLogger(customClass, settings); + this.settings = settings; + this.componentSettings = settings.getComponentSettings(customClass); + } + + public AbstractComponent(Settings settings, Class loggerClass, Class componentClass) { + this.logger = Loggers.getLogger(loggerClass, settings); this.settings = settings; this.componentSettings = settings.getComponentSettings(componentClass); } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/shard/SimpleIndexShardTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/shard/SimpleIndexShardTests.java index c5ec76df4f2..fda657ea06c 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/shard/SimpleIndexShardTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/shard/SimpleIndexShardTests.java @@ -31,6 +31,8 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider; import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider; import org.elasticsearch.index.query.IndexQueryParserService; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.ram.RamStore; diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java index c19d3b2211c..483492bfc0f 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java @@ -79,11 +79,11 @@ public class IndexLifecycleActionTests extends AbstractServersTests { ClusterState clusterState1 = clusterService1.state(); - RoutingNode routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); + RoutingNode routingNodeEntry1 = clusterState1.readOnlyRoutingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(11)); clusterState1 = client("server1").admin().cluster().state(clusterState()).actionGet().state(); - routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); + routingNodeEntry1 = clusterState1.readOnlyRoutingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(11)); logger.info("Starting server2"); @@ -102,11 +102,11 @@ public class IndexLifecycleActionTests extends AbstractServersTests { Thread.sleep(200); clusterState1 = clusterService1.state(); - routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); + routingNodeEntry1 = clusterState1.readOnlyRoutingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(11)); ClusterState clusterState2 = clusterService2.state(); - RoutingNode routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); + RoutingNode routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), equalTo(11)); logger.info("Starting server3"); @@ -128,15 +128,15 @@ public class IndexLifecycleActionTests extends AbstractServersTests { Thread.sleep(200); clusterState1 = clusterService1.state(); - routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); + routingNodeEntry1 = clusterState1.readOnlyRoutingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), anyOf(equalTo(7), equalTo(8))); clusterState2 = clusterService2.state(); - routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); + routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), anyOf(equalTo(7), equalTo(8))); ClusterState clusterState3 = clusterService3.state(); - RoutingNode routingNodeEntry3 = clusterState3.routingNodes().nodesToShards().get(clusterState3.nodes().localNodeId()); + RoutingNode routingNodeEntry3 = clusterState3.readOnlyRoutingNodes().nodesToShards().get(clusterState3.nodes().localNodeId()); assertThat(routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(7)); assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED) + routingNodeEntry2.numberOfShardsWithState(STARTED) + routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(22)); @@ -160,11 +160,11 @@ public class IndexLifecycleActionTests extends AbstractServersTests { Thread.sleep(200); clusterState2 = clusterService2.state(); - routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); + routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), equalTo(11)); clusterState3 = clusterService3.state(); - routingNodeEntry3 = clusterState3.routingNodes().nodesToShards().get(clusterState3.nodes().localNodeId()); + routingNodeEntry3 = clusterState3.readOnlyRoutingNodes().nodesToShards().get(clusterState3.nodes().localNodeId()); assertThat(routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(11)); assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED) + routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(22)); @@ -176,11 +176,11 @@ public class IndexLifecycleActionTests extends AbstractServersTests { Thread.sleep(200); clusterState2 = clusterService2.state(); - routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); + routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); assertThat(routingNodeEntry2, nullValue()); clusterState3 = clusterService3.state(); - routingNodeEntry3 = clusterState3.routingNodes().nodesToShards().get(clusterState3.nodes().localNodeId()); + routingNodeEntry3 = clusterState3.readOnlyRoutingNodes().nodesToShards().get(clusterState3.nodes().localNodeId()); assertThat(routingNodeEntry3, nullValue()); } @@ -211,7 +211,7 @@ public class IndexLifecycleActionTests extends AbstractServersTests { assertThat(clusterHealth.activePrimaryShards(), equalTo(11)); ClusterState clusterState1 = clusterService1.state(); - RoutingNode routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); + RoutingNode routingNodeEntry1 = clusterState1.readOnlyRoutingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(11)); // start another server @@ -235,11 +235,11 @@ public class IndexLifecycleActionTests extends AbstractServersTests { ClusterService clusterService2 = ((InternalServer) server("server2")).injector().getInstance(ClusterService.class); clusterState1 = clusterService1.state(); - routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); + routingNodeEntry1 = clusterState1.readOnlyRoutingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), anyOf(equalTo(6), equalTo(5))); ClusterState clusterState2 = clusterService2.state(); - RoutingNode routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); + RoutingNode routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(6))); // start another server @@ -263,15 +263,15 @@ public class IndexLifecycleActionTests extends AbstractServersTests { Thread.sleep(200); clusterState1 = clusterService1.state(); - routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); + routingNodeEntry1 = clusterState1.readOnlyRoutingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(3))); clusterState2 = clusterService2.state(); - routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); + routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(3))); ClusterState clusterState3 = clusterService3.state(); - RoutingNode routingNodeEntry3 = clusterState3.routingNodes().nodesToShards().get(clusterState3.nodes().localNodeId()); + RoutingNode routingNodeEntry3 = clusterState3.readOnlyRoutingNodes().nodesToShards().get(clusterState3.nodes().localNodeId()); assertThat(routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(3)); assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED) + routingNodeEntry2.numberOfShardsWithState(STARTED) + routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(11)); @@ -295,11 +295,11 @@ public class IndexLifecycleActionTests extends AbstractServersTests { Thread.sleep(200); clusterState2 = clusterService2.state(); - routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); + routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(6))); clusterState3 = clusterService3.state(); - routingNodeEntry3 = clusterState3.routingNodes().nodesToShards().get(clusterState3.nodes().localNodeId()); + routingNodeEntry3 = clusterState3.readOnlyRoutingNodes().nodesToShards().get(clusterState3.nodes().localNodeId()); assertThat(routingNodeEntry3.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(6))); assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED) + routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(11)); @@ -310,11 +310,11 @@ public class IndexLifecycleActionTests extends AbstractServersTests { assertThat(deleteIndexResponse.acknowledged(), equalTo(true)); clusterState2 = clusterService2.state(); - routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); + routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); assertThat(routingNodeEntry2, nullValue()); clusterState3 = clusterService3.state(); - routingNodeEntry3 = clusterState3.routingNodes().nodesToShards().get(clusterState3.nodes().localNodeId()); + routingNodeEntry3 = clusterState3.readOnlyRoutingNodes().nodesToShards().get(clusterState3.nodes().localNodeId()); assertThat(routingNodeEntry3, nullValue()); } diff --git a/modules/test/testng/src/main/java/log4j.properties b/modules/test/testng/src/main/java/log4j.properties index 199132b5b7f..86d72561ccf 100644 --- a/modules/test/testng/src/main/java/log4j.properties +++ b/modules/test/testng/src/main/java/log4j.properties @@ -2,11 +2,13 @@ log4j.rootLogger=INFO, out log4j.logger.jgroups=WARN #log4j.logger.discovery=TRACE -#log4j.logger.cluster=TRACE +#log4j.logger.cluster.service=TRACE +#log4j.logger.cluster.action.shard=DEBUG #log4j.logger.indices.cluster=DEBUG #log4j.logger.index=TRACE #log4j.logger.index.engine=DEBUG -#log4j.logger.index.shard.recovery=TRACE +#log4j.logger.index.shard.service=DEBUG +#log4j.logger.index.shard.recovery=DEBUG #log4j.logger.index.cache=DEBUG #log4j.logger.http=TRACE #log4j.logger.monitor.memory=TRACE