add validation on routing table, expose it in cluster health, and add more information when shard start or fail

This commit is contained in:
kimchy 2010-02-25 18:40:29 +02:00
parent f5a4296bf7
commit 70726b268f
42 changed files with 560 additions and 136 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.cluster.health;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.elasticsearch.action.ActionResponse;
@ -26,8 +27,10 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static com.google.common.collect.Lists.*;
import static org.elasticsearch.action.admin.cluster.health.ClusterIndexHealth.*;
/**
@ -47,19 +50,41 @@ public class ClusterHealthResponse implements ActionResponse, Iterable<ClusterIn
ClusterHealthStatus status = ClusterHealthStatus.RED;
private List<String> validationFailures;
Map<String, ClusterIndexHealth> indices = Maps.newHashMap();
ClusterHealthResponse() {
}
public ClusterHealthResponse(String clusterName) {
public ClusterHealthResponse(String clusterName, List<String> validationFailures) {
this.clusterName = clusterName;
this.validationFailures = validationFailures;
}
public String clusterName() {
return clusterName;
}
/**
* The validation failures on the cluster level (without index validation failures).
*/
public List<String> validationFailures() {
return this.validationFailures;
}
/**
* All the validation failures, including index level validation failures.
*/
public List<String> allValidationFailures() {
List<String> allFailures = newArrayList(validationFailures());
for (ClusterIndexHealth indexHealth : indices.values()) {
allFailures.addAll(indexHealth.validationFailures());
}
return allFailures;
}
public int activeShards() {
return activeShards;
}
@ -103,6 +128,14 @@ public class ClusterHealthResponse implements ActionResponse, Iterable<ClusterIn
indices.put(indexHealth.index(), indexHealth);
}
timedOut = in.readBoolean();
size = in.readInt();
if (size == 0) {
validationFailures = ImmutableList.of();
} else {
for (int i = 0; i < size; i++) {
validationFailures.add(in.readUTF());
}
}
}
@Override public void writeTo(DataOutput out) throws IOException {
@ -116,6 +149,11 @@ public class ClusterHealthResponse implements ActionResponse, Iterable<ClusterIn
indexHealth.writeTo(out);
}
out.writeBoolean(timedOut);
out.writeInt(validationFailures.size());
for (String failure : validationFailures) {
out.writeUTF(failure);
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.cluster.health;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.elasticsearch.util.io.Streamable;
@ -26,6 +27,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.action.admin.cluster.health.ClusterShardHealth.*;
@ -51,19 +53,26 @@ public class ClusterIndexHealth implements Iterable<ClusterShardHealth>, Streama
final Map<Integer, ClusterShardHealth> shards = Maps.newHashMap();
List<String> validationFailures;
private ClusterIndexHealth() {
}
public ClusterIndexHealth(String index, int numberOfShards, int numberOfReplicas) {
public ClusterIndexHealth(String index, int numberOfShards, int numberOfReplicas, List<String> validationFailures) {
this.index = index;
this.numberOfShards = numberOfShards;
this.numberOfReplicas = numberOfReplicas;
this.validationFailures = validationFailures;
}
public String index() {
return index;
}
public List<String> validationFailures() {
return this.validationFailures;
}
public int numberOfShards() {
return numberOfShards;
}
@ -116,6 +125,14 @@ public class ClusterIndexHealth implements Iterable<ClusterShardHealth>, Streama
ClusterShardHealth shardHealth = readClusterShardHealth(in);
shards.put(shardHealth.id(), shardHealth);
}
size = in.readInt();
if (size == 0) {
validationFailures = ImmutableList.of();
} else {
for (int i = 0; i < size; i++) {
validationFailures.add(in.readUTF());
}
}
}
@Override public void writeTo(DataOutput out) throws IOException {
@ -131,5 +148,10 @@ public class ClusterIndexHealth implements Iterable<ClusterShardHealth>, Streama
for (ClusterShardHealth shardHealth : this) {
shardHealth.writeTo(out);
}
out.writeInt(validationFailures.size());
for (String failure : validationFailures) {
out.writeUTF(failure);
}
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTableValidation;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.timer.TimerService;
@ -105,8 +106,10 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
}
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request) {
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value());
ClusterState clusterState = clusterService.state();
RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), validation.failures());
String[] indices = processIndices(clusterState, request.indices());
for (String index : indices) {
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index);
@ -114,7 +117,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
if (indexRoutingTable == null) {
continue;
}
ClusterIndexHealth indexHealth = new ClusterIndexHealth(index, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas());
ClusterIndexHealth indexHealth = new ClusterIndexHealth(index, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), validation.indexFailures(indexMetaData.index()));
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
ClusterShardHealth shardHealth = new ClusterShardHealth(shardRoutingTable.shardId().id());
@ -151,13 +154,17 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
}
// update the index status
indexHealth.status = ClusterHealthStatus.GREEN;
for (ClusterShardHealth shardHealth : indexHealth) {
if (shardHealth.status() == ClusterHealthStatus.RED) {
indexHealth.status = ClusterHealthStatus.RED;
break;
}
if (shardHealth.status() == ClusterHealthStatus.YELLOW) {
indexHealth.status = ClusterHealthStatus.YELLOW;
if (!indexHealth.validationFailures().isEmpty()) {
indexHealth.status = ClusterHealthStatus.RED;
} else {
for (ClusterShardHealth shardHealth : indexHealth) {
if (shardHealth.status() == ClusterHealthStatus.RED) {
indexHealth.status = ClusterHealthStatus.RED;
break;
}
if (shardHealth.status() == ClusterHealthStatus.YELLOW) {
indexHealth.status = ClusterHealthStatus.YELLOW;
}
}
}
@ -171,17 +178,20 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
}
response.status = ClusterHealthStatus.GREEN;
for (ClusterIndexHealth indexHealth : response) {
if (indexHealth.status() == ClusterHealthStatus.RED) {
response.status = ClusterHealthStatus.RED;
break;
}
if (indexHealth.status() == ClusterHealthStatus.YELLOW) {
response.status = ClusterHealthStatus.YELLOW;
if (!response.validationFailures().isEmpty()) {
response.status = ClusterHealthStatus.RED;
} else {
for (ClusterIndexHealth indexHealth : response) {
if (indexHealth.status() == ClusterHealthStatus.RED) {
response.status = ClusterHealthStatus.RED;
break;
}
if (indexHealth.status() == ClusterHealthStatus.YELLOW) {
response.status = ClusterHealthStatus.YELLOW;
}
}
}
return response;
}
}

View File

@ -31,7 +31,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -31,7 +31,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -31,7 +31,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -32,7 +32,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.InternalIndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -30,7 +30,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -24,7 +24,7 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.single.TransportSingleOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -35,8 +35,8 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
@ -52,6 +52,8 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.ExceptionsHelper.*;
/**
* @author kimchy (Shay Banon)
*/
@ -394,7 +396,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override public void handleException(RemoteTransportException exp) {
if (!ignoreBackupException(exp.unwrapCause())) {
logger.warn("Failed to perform " + transportAction() + " on backup " + shards.shardId(), exp);
shardStateAction.shardFailed(shard);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on backup, message [" + detailedMessage(exp) + "]");
}
finishIfPossible();
}
@ -427,7 +429,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} catch (Exception e) {
if (!ignoreBackupException(e)) {
logger.warn("Failed to perform " + transportAction() + " on backup " + shards.shardId(), e);
shardStateAction.shardFailed(shard);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on backup, message [" + detailedMessage(e) + "]");
}
}
if (counter.decrementAndGet() == 0) {
@ -441,7 +443,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} catch (Exception e) {
if (!ignoreBackupException(e)) {
logger.warn("Failed to perform " + transportAction() + " on backup " + shards.shardId(), e);
shardStateAction.shardFailed(shard);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on backup, message [" + detailedMessage(e) + "]");
}
}
if (counter.decrementAndGet() == 0) {

View File

@ -37,7 +37,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -21,7 +21,7 @@ package org.elasticsearch.client.transport;
import com.google.inject.AbstractModule;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.DefaultClusterService;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.util.logging.Loggers;
import org.elasticsearch.util.settings.NoClassSettingsException;
@ -41,7 +41,7 @@ public class TransportClientClusterModule extends AbstractModule {
@Override protected void configure() {
try {
new DiscoveryModule(settings).configure(binder());
bind(ClusterService.class).to(DefaultClusterService.class).asEagerSingleton();
bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton();
bind(TransportClientClusterService.class).asEagerSingleton();
} catch (NoClassSettingsException e) {
// that's fine, no actual implementation for discovery

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.MetaDataService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.strategy.DefaultShardsRoutingStrategy;
import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.util.settings.Settings;
/**
@ -48,7 +49,7 @@ public class ClusterModule extends AbstractModule {
.to(settings.getAsClass("cluster.routing.shards.type", DefaultShardsRoutingStrategy.class))
.asEagerSingleton();
bind(ClusterService.class).to(DefaultClusterService.class).asEagerSingleton();
bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton();
bind(MetaDataService.class).asEagerSingleton();
bind(RoutingService.class).asEagerSingleton();

View File

@ -72,10 +72,15 @@ public class ClusterState {
return routingTable;
}
/**
* Returns a built (on demand) routing nodes view of the routing table.
*/
public RoutingNodes routingNodes() {
return routingTable.routingNodes(metaData);
}
/**
* Returns a built (on demand) routing nodes view of the routing table. <b>NOTE, the routing nodes
* are immutable, use them just for read operations</b>
*/
public RoutingNodes readOnlyRoutingNodes() {
if (routingNodes != null) {
return routingNodes;
}
@ -132,11 +137,6 @@ public class ClusterState {
return this;
}
Builder incrementVersion() {
this.version++;
return this;
}
public ClusterState build() {
return new ClusterState(version, metaData, routingTable, nodes);
}

View File

@ -25,7 +25,10 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.node.Nodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
@ -33,11 +36,17 @@ import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.io.Streamable;
import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import static com.google.common.collect.Lists.*;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.routing.ImmutableShardRouting.*;
/**
* @author kimchy (Shay Banon)
@ -64,41 +73,41 @@ public class ShardStateAction extends AbstractComponent {
transportService.registerHandler(ShardFailedTransportHandler.ACTION, new ShardFailedTransportHandler());
}
public void shardFailed(final ShardRouting shardRouting) throws ElasticSearchException {
logger.warn("Sending failed shard for {}", shardRouting);
public void shardFailed(final ShardRouting shardRouting, final String reason) throws ElasticSearchException {
logger.warn("Sending failed shard for {}, reason [{}]", shardRouting, reason);
Nodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.execute(new Runnable() {
@Override public void run() {
innerShardFailed(shardRouting);
innerShardFailed(shardRouting, reason);
}
});
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
ShardFailedTransportHandler.ACTION, shardRouting, VoidTransportResponseHandler.INSTANCE);
ShardFailedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), VoidTransportResponseHandler.INSTANCE);
}
}
public void shardStarted(final ShardRouting shardRouting) throws ElasticSearchException {
public void shardStarted(final ShardRouting shardRouting, final String reason) throws ElasticSearchException {
if (logger.isDebugEnabled()) {
logger.debug("Sending shard started for {}", shardRouting);
logger.debug("Sending shard started for {}, reason [{}]", shardRouting, reason);
}
Nodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.execute(new Runnable() {
@Override public void run() {
innerShardStarted(shardRouting);
innerShardStarted(shardRouting, reason);
}
});
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
ShardStartedTransportHandler.ACTION, shardRouting, VoidTransportResponseHandler.INSTANCE);
ShardStartedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), VoidTransportResponseHandler.INSTANCE);
}
}
private void innerShardFailed(final ShardRouting shardRouting) {
logger.warn("Received shard failed for {}", shardRouting);
clusterService.submitStateUpdateTask("shard-failed (" + shardRouting + ")", new ClusterStateUpdateTask() {
private void innerShardFailed(final ShardRouting shardRouting, final String reason) {
logger.warn("Received shard failed for {}, reason [{}]", shardRouting, reason);
clusterService.submitStateUpdateTask("shard-failed (" + shardRouting + "), reason [" + reason + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingTable routingTable = currentState.routingTable();
IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index());
@ -108,7 +117,7 @@ public class ShardStateAction extends AbstractComponent {
return currentState;
}
if (logger.isDebugEnabled()) {
logger.debug("Applying failed shard {}", shardRouting);
logger.debug("Applying failed shard {}, reason [{}]", shardRouting, reason);
}
RoutingTable prevRoutingTable = currentState.routingTable();
RoutingTable newRoutingTable = shardsRoutingStrategy.applyFailedShards(currentState, newArrayList(shardRouting));
@ -120,11 +129,11 @@ public class ShardStateAction extends AbstractComponent {
});
}
private void innerShardStarted(final ShardRouting shardRouting) {
private void innerShardStarted(final ShardRouting shardRouting, final String reason) {
if (logger.isDebugEnabled()) {
logger.debug("Received shard started for {}", shardRouting);
logger.debug("Received shard started for {}, reason [{}]", shardRouting, reason);
}
clusterService.submitStateUpdateTask("shard-started (" + shardRouting + ")", new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("shard-started (" + shardRouting + "), reason [" + reason + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingTable routingTable = currentState.routingTable();
IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index());
@ -147,7 +156,7 @@ public class ShardStateAction extends AbstractComponent {
}
}
if (logger.isDebugEnabled()) {
logger.debug("Applying started shard {}", shardRouting);
logger.debug("Applying started shard {}, reason [{}]", shardRouting, reason);
}
RoutingTable newRoutingTable = shardsRoutingStrategy.applyStartedShards(currentState, newArrayList(shardRouting));
if (routingTable == newRoutingTable) {
@ -158,31 +167,56 @@ public class ShardStateAction extends AbstractComponent {
});
}
private class ShardFailedTransportHandler extends BaseTransportRequestHandler<ShardRouting> {
private class ShardFailedTransportHandler extends BaseTransportRequestHandler<ShardRoutingEntry> {
static final String ACTION = "cluster/shardFailure";
@Override public ShardRouting newInstance() {
return new ImmutableShardRouting();
@Override public ShardRoutingEntry newInstance() {
return new ShardRoutingEntry();
}
@Override public void messageReceived(ShardRouting request, TransportChannel channel) throws Exception {
innerShardFailed(request);
@Override public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
innerShardFailed(request.shardRouting, request.reason);
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
private class ShardStartedTransportHandler extends BaseTransportRequestHandler<ShardRouting> {
private class ShardStartedTransportHandler extends BaseTransportRequestHandler<ShardRoutingEntry> {
static final String ACTION = "cluster/shardStarted";
@Override public ShardRouting newInstance() {
return new ImmutableShardRouting();
@Override public ShardRoutingEntry newInstance() {
return new ShardRoutingEntry();
}
@Override public void messageReceived(ShardRouting request, TransportChannel channel) throws Exception {
innerShardStarted(request);
@Override public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
innerShardStarted(request.shardRouting, request.reason);
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
private static class ShardRoutingEntry implements Streamable {
private ShardRouting shardRouting;
private String reason;
private ShardRoutingEntry() {
}
private ShardRoutingEntry(ShardRouting shardRouting, String reason) {
this.shardRouting = shardRouting;
this.reason = reason;
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
shardRouting = readShardRoutingEntry(in);
reason = in.readUTF();
}
@Override public void writeTo(DataOutput out) throws IOException {
shardRouting.writeTo(out);
out.writeUTF(reason);
}
}
}

View File

@ -20,8 +20,10 @@
package org.elasticsearch.cluster.routing;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.util.IdentityHashSet;
import org.elasticsearch.util.concurrent.Immutable;
@ -30,6 +32,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @author kimchy (Shay Banon)
@ -52,6 +55,33 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
return this.index;
}
public void validate(RoutingTableValidation validation, MetaData metaData) {
if (!metaData.hasIndex(index())) {
validation.addIndexFailure(index(), "Exists in routing does not exists in metadata");
return;
}
IndexMetaData indexMetaData = metaData.index(index());
// check the number of shards
if (indexMetaData.numberOfShards() != shards().size()) {
Set<Integer> expected = Sets.newHashSet();
for (int i = 0; i < indexMetaData.numberOfShards(); i++) {
expected.add(i);
}
for (IndexShardRoutingTable indexShardRoutingTable : this) {
expected.remove(indexShardRoutingTable.shardId().id());
}
validation.addIndexFailure(index(), "Wrong number of shards in routing table, missing: " + expected);
}
// check the replicas
for (IndexShardRoutingTable indexShardRoutingTable : this) {
int routingNumberOfReplicas = indexShardRoutingTable.size() - 1;
if (routingNumberOfReplicas != indexMetaData.numberOfReplicas()) {
validation.addIndexFailure(index(), "Shard [" + indexShardRoutingTable.shardId().id()
+ "] routing table has wrong number of replicas, expected [" + indexMetaData.numberOfReplicas() + "], got [" + routingNumberOfReplicas + "]");
}
}
}
@Override public UnmodifiableIterator<IndexShardRoutingTable> iterator() {
return shards.values().iterator();
}
@ -174,7 +204,7 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
public String prettyPrint() {
StringBuilder sb = new StringBuilder("-- Index[" + index + "]\n");
for (IndexShardRoutingTable indexShard : this) {
sb.append("----ShardId[").append(indexShard.shardId()).append("]\n");
sb.append("----ShardId[").append(indexShard.shardId().index().name()).append("][").append(indexShard.shardId().id()).append("]\n");
for (ShardRouting shard : indexShard) {
sb.append("--------").append(shard.shortSummary()).append("\n");
}

View File

@ -140,6 +140,10 @@ public class RoutingService extends AbstractComponent implements ClusterStateLis
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingTable newRoutingTable = shardsRoutingStrategy.reroute(currentState);
if (newRoutingTable == currentState.routingTable()) {
// no state changed
return currentState;
}
return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build();
}
});

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
@ -70,6 +71,27 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
return new RoutingNodes(metaData, this);
}
public RoutingTable validateRaiseException(MetaData metaData) throws RoutingValidationException {
RoutingTableValidation validation = validate(metaData);
if (!validation.valid()) {
throw new RoutingValidationException(validation);
}
return this;
}
public RoutingTableValidation validate(MetaData metaData) {
RoutingTableValidation validation = new RoutingTableValidation();
for (IndexMetaData indexMetaData : metaData) {
if (!indicesRouting.containsKey(indexMetaData.index())) {
validation.addIndexFailure(indexMetaData.index(), "Exists in metadata and does not exists in routing table");
}
}
for (IndexRoutingTable indexRoutingTable : this) {
indexRoutingTable.validate(validation, metaData);
}
return validation;
}
/**
* All the shards (replicas) for the provided indices.
*

View File

@ -0,0 +1,169 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.elasticsearch.util.io.Streamable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import static com.google.common.collect.Lists.*;
import static com.google.common.collect.Maps.*;
/**
* @author kimchy (shay.banon)
*/
public class RoutingTableValidation implements Serializable, Streamable {
private boolean valid = true;
private List<String> failures;
private Map<String, List<String>> indicesFailures;
public RoutingTableValidation() {
}
public boolean valid() {
return valid;
}
public List<String> allFailures() {
if (failures().isEmpty() && indicesFailures().isEmpty()) {
return ImmutableList.of();
}
List<String> allFailures = newArrayList(failures());
for (Map.Entry<String, List<String>> entry : indicesFailures().entrySet()) {
for (String failure : entry.getValue()) {
allFailures.add("Index [" + entry.getKey() + "]: " + failure);
}
}
return allFailures;
}
public List<String> failures() {
if (failures == null) {
return ImmutableList.of();
}
return failures;
}
public Map<String, List<String>> indicesFailures() {
if (indicesFailures == null) {
return ImmutableMap.of();
}
return indicesFailures;
}
public List<String> indexFailures(String index) {
if (indicesFailures == null) {
return ImmutableList.of();
}
List<String> indexFailures = indicesFailures.get(index);
if (indexFailures == null) {
return ImmutableList.of();
}
return indexFailures;
}
public void addFailure(String failure) {
valid = false;
if (failures == null) {
failures = newArrayList();
}
failures.add(failure);
}
public void addIndexFailure(String index, String failure) {
valid = false;
if (indicesFailures == null) {
indicesFailures = newHashMap();
}
List<String> indexFailures = indicesFailures.get(index);
if (indexFailures == null) {
indexFailures = Lists.newArrayList();
indicesFailures.put(index, indexFailures);
}
indexFailures.add(failure);
}
@Override public String toString() {
return allFailures().toString();
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
valid = in.readBoolean();
int size = in.readInt();
if (size == 0) {
failures = ImmutableList.of();
} else {
failures = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
failures.add(in.readUTF());
}
}
size = in.readInt();
if (size == 0) {
indicesFailures = ImmutableMap.of();
} else {
indicesFailures = newHashMap();
for (int i = 0; i < size; i++) {
String index = in.readUTF();
int size2 = in.readInt();
List<String> indexFailures = newArrayListWithCapacity(size2);
for (int j = 0; j < size2; j++) {
indexFailures.add(in.readUTF());
}
indicesFailures.put(index, indexFailures);
}
}
}
@Override public void writeTo(DataOutput out) throws IOException {
out.writeBoolean(valid);
if (failures == null) {
out.writeInt(0);
} else {
out.writeInt(failures.size());
for (String failure : failures) {
out.writeUTF(failure);
}
}
if (indicesFailures == null) {
out.writeInt(0);
} else {
out.writeInt(indicesFailures.size());
for (Map.Entry<String, List<String>> entry : indicesFailures.entrySet()) {
out.writeUTF(entry.getKey());
out.writeInt(entry.getValue().size());
for (String failure : entry.getValue()) {
out.writeUTF(failure);
}
}
}
}
}

View File

@ -24,11 +24,14 @@ package org.elasticsearch.cluster.routing;
*/
public class RoutingValidationException extends RoutingException {
public RoutingValidationException(String message) {
super(message);
private final RoutingTableValidation validation;
public RoutingValidationException(RoutingTableValidation validation) {
super(validation.toString());
this.validation = validation;
}
public RoutingValidationException(String message, Throwable cause) {
super(message, cause);
public RoutingTableValidation validation() {
return this.validation;
}
}

View File

@ -40,7 +40,7 @@ public class DefaultShardsRoutingStrategy implements ShardsRoutingStrategy {
if (!applyStartedShards(routingNodes, startedShardEntries)) {
return clusterState.routingTable();
}
return new RoutingTable.Builder().updateNodes(routingNodes).build();
return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
}
@Override public RoutingTable applyFailedShards(ClusterState clusterState, Iterable<? extends ShardRouting> failedShardEntries) {
@ -48,7 +48,7 @@ public class DefaultShardsRoutingStrategy implements ShardsRoutingStrategy {
if (!applyFailedShards(routingNodes, failedShardEntries)) {
return clusterState.routingTable();
}
return new RoutingTable.Builder().updateNodes(routingNodes).build();
return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
}
@Override public RoutingTable reroute(ClusterState clusterState) {
@ -78,7 +78,7 @@ public class DefaultShardsRoutingStrategy implements ShardsRoutingStrategy {
return clusterState.routingTable();
}
return new RoutingTable.Builder().updateNodes(routingNodes).build();
return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
}
private boolean rebalance(RoutingNodes routingNodes) {

View File

@ -17,10 +17,11 @@
* under the License.
*/
package org.elasticsearch.cluster;
package org.elasticsearch.cluster.service;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.node.Nodes;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.threadpool.ThreadPool;
@ -44,7 +45,7 @@ import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
/**
* @author kimchy (Shay Banon)
*/
public class DefaultClusterService extends AbstractComponent implements ClusterService {
public class InternalClusterService extends AbstractComponent implements ClusterService {
private final Lifecycle lifecycle = new Lifecycle();
@ -66,7 +67,7 @@ public class DefaultClusterService extends AbstractComponent implements ClusterS
private volatile ClusterState clusterState = newClusterStateBuilder().build();
@Inject public DefaultClusterService(Settings settings, DiscoveryService discoveryService, TransportService transportService, ThreadPool threadPool) {
@Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, TransportService transportService, ThreadPool threadPool) {
super(settings);
this.transportService = transportService;
this.discoveryService = discoveryService;
@ -90,7 +91,7 @@ public class DefaultClusterService extends AbstractComponent implements ClusterS
for (final TimeoutHolder holder : clusterStateTimeoutListeners) {
if ((timestamp - holder.timestamp) > holder.timeout.millis()) {
clusterStateTimeoutListeners.remove(holder);
DefaultClusterService.this.threadPool.execute(new Runnable() {
InternalClusterService.this.threadPool.execute(new Runnable() {
@Override public void run() {
holder.listener.onTimeout(holder.timeout);
}
@ -158,18 +159,33 @@ public class DefaultClusterService extends AbstractComponent implements ClusterS
return;
}
ClusterState previousClusterState = clusterState;
clusterState = updateTask.execute(previousClusterState);
try {
clusterState = updateTask.execute(previousClusterState);
} catch (Exception e) {
StringBuilder sb = new StringBuilder("Failed to execute cluster state update, state:\nVersion [").append(clusterState.version()).append("], source [").append(source).append("]\n");
sb.append(clusterState.nodes().prettyPrint());
sb.append(clusterState.routingTable().prettyPrint());
sb.append(clusterState.readOnlyRoutingNodes().prettyPrint());
logger.warn(sb.toString(), e);
return;
}
if (previousClusterState != clusterState) {
if (clusterState.nodes().localNodeMaster()) {
// only the master controls the version numbers
clusterState = newClusterStateBuilder().state(clusterState).incrementVersion().build();
clusterState = new ClusterState(clusterState.version() + 1, clusterState.metaData(), clusterState.routingTable(), clusterState.nodes());
} else {
// we got this cluster state from the master, filter out based on versions (don't call listeners)
if (clusterState.version() < previousClusterState.version()) {
logger.info("Got old cluster state [" + clusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring");
return;
}
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("Cluster State updated, version [").append(clusterState.version()).append("], source [").append(source).append("]\n");
StringBuilder sb = new StringBuilder("Cluster State updated:\nVersion [").append(clusterState.version()).append("], source [").append(source).append("]\n");
sb.append(clusterState.nodes().prettyPrint());
sb.append(clusterState.routingTable().prettyPrint());
sb.append(clusterState.routingNodes().prettyPrint());
sb.append(clusterState.readOnlyRoutingNodes().prettyPrint());
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("Cluster state updated, version [{}], source [{}]", clusterState.version(), source);

View File

@ -25,7 +25,7 @@ import org.elasticsearch.index.cache.filter.FilterCache;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.routing.OperationRouting;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.similarity.SimilarityService;
import java.util.Set;

View File

@ -37,11 +37,11 @@ import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.routing.OperationRouting;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardManagement;
import org.elasticsearch.index.shard.IndexShardModule;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.recovery.RecoveryAction;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreModule;

View File

@ -25,7 +25,12 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;

View File

@ -31,9 +31,9 @@ import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException;
import org.elasticsearch.index.gateway.RecoveryStatus;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.InternalIndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;

View File

@ -26,9 +26,9 @@ import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.RecoveryStatus;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.InternalIndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.shard;
import com.google.inject.Inject;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.jmx.JmxService;

View File

@ -21,6 +21,8 @@ package org.elasticsearch.index.shard;
import com.google.inject.AbstractModule;
import org.elasticsearch.index.shard.recovery.RecoveryAction;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
/**
* @author kimchy (Shay Banon)

View File

@ -29,6 +29,8 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.memory.MemorySnapshot;
@ -349,7 +351,13 @@ public class RecoveryAction extends AbstractIndexShardComponent {
sendSnapshot(snapshot, true);
if (startRecoveryRequest.markAsRelocated) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
indexShard.relocated();
try {
indexShard.relocated();
} catch (IllegalIndexShardStateException e) {
// we can ignore this exception since, on the other node, when it moved to phase3
// it will also send shard started, which might cause the index shard we work against
// to move be closed by the time we get to the the relocated method
}
}
stopWatch.stop();
logger.trace("Recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime());

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.shard;
package org.elasticsearch.index.shard.service;
import org.apache.lucene.index.Term;
import org.elasticsearch.ElasticSearchException;
@ -25,6 +25,9 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.IndexShardLifecycle;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.concurrent.ThreadSafe;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.shard;
package org.elasticsearch.index.shard.service;
import com.google.inject.Inject;
import org.apache.lucene.document.Document;
@ -39,6 +39,7 @@ import org.elasticsearch.index.query.IndexQueryParser;
import org.elasticsearch.index.query.IndexQueryParserMissingException;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.TypeMissingException;
@ -95,6 +96,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
this.queryParserService = queryParserService;
this.filterCache = filterCache;
state = IndexShardState.CREATED;
logger.debug("Moved to state [CREATED]");
}
public Store store() {
@ -143,6 +145,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
throw new IndexShardRecoveringException(shardId);
}
state = IndexShardState.RECOVERING;
logger.debug("Moved to state [RECOVERING]");
return returnValue;
}
}
@ -152,6 +155,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (this.state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
logger.debug("Restored to state [{}] from state [{}]", stateToRestore, state);
this.state = stateToRestore;
}
return this;
@ -162,6 +166,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
}
logger.debug("Moved to state [RELOCATED]");
state = IndexShardState.RELOCATED;
}
return this;
@ -180,6 +185,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
engine.start();
scheduleRefresherIfNeeded();
logger.debug("Moved to state [STARTED]");
state = IndexShardState.STARTED;
}
return this;
@ -379,6 +385,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
refreshScheduledFuture = null;
}
}
logger.debug("Moved to state [CLOSED]");
state = IndexShardState.CLOSED;
}
}
@ -390,6 +397,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
engine.start();
applyTranslogOperations(operations);
synchronized (mutex) {
logger.debug("Moved to state [STARTED] post recovery (from gateway)");
state = IndexShardState.STARTED;
}
scheduleRefresherIfNeeded();
@ -406,6 +414,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
applyTranslogOperations(snapshot);
if (phase3) {
synchronized (mutex) {
logger.debug("Moved to state [STARTED] post recovery (from another shard)");
state = IndexShardState.STARTED;
}
scheduleRefresherIfNeeded();

View File

@ -22,8 +22,8 @@ package org.elasticsearch.index.translog;
import org.apache.lucene.index.Term;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.Strings;

View File

@ -24,9 +24,9 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.InternalIndexShard;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;

View File

@ -43,11 +43,11 @@ import org.elasticsearch.index.gateway.IgnoreGatewayRecoveryException;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.InternalIndexShard;
import org.elasticsearch.index.shard.recovery.IgnoreRecoveryException;
import org.elasticsearch.index.shard.recovery.RecoveryAction;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.component.AbstractComponent;
@ -59,6 +59,7 @@ import java.util.Map;
import java.util.Set;
import static com.google.common.collect.Sets.*;
import static org.elasticsearch.ExceptionsHelper.*;
/**
* @author kimchy (Shay Banon)
@ -146,7 +147,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu
RoutingTable routingTable = event.state().routingTable();
RoutingNode routingNodes = event.state().routingNodes().nodesToShards().get(event.state().nodes().localNodeId());
RoutingNode routingNodes = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId());
if (routingNodes != null) {
applyShards(routingNodes, routingTable, event.state().nodes());
}
@ -238,7 +239,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu
if (!indexService.hasShard(shardId) && shardRouting.started()) {
// the master thinks we are started, but we don't have this shard at all, mark it as failed
logger.warn("[" + shardRouting.index() + "][" + shardRouting.shardId().id() + "] Master " + nodes.masterNode() + " marked shard as started, but shard have not been created, mark shard as failed");
shardStateAction.shardFailed(shardRouting);
shardStateAction.shardFailed(shardRouting, "Master " + nodes.masterNode() + " marked shard as started, but shard have not been created, mark shard as failed");
continue;
}
@ -268,7 +269,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu
if (logger.isTraceEnabled()) {
logger.trace("[" + shardRouting.index() + "][" + shardRouting.shardId().id() + "] Master " + nodes.masterNode() + " marked shard as initializing, but shard already started, mark shard as started");
}
shardStateAction.shardStarted(shardRouting);
shardStateAction.shardStarted(shardRouting, "Master " + nodes.masterNode() + " marked shard as initializing, but shard already started, mark shard as started");
return;
} else {
if (indexShard.ignoreRecoveryAttempt()) {
@ -293,7 +294,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu
} catch (Exception e1) {
logger.warn("Failed to delete shard after failed creation for index [" + indexService.index().name() + "] and shard id [" + shardRouting.id() + "]", e1);
}
shardStateAction.shardFailed(shardRouting);
shardStateAction.shardFailed(shardRouting, "Failed to create shard, message [" + detailedMessage(e) + "]");
return;
}
}
@ -323,7 +324,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu
try {
// we are recovering a backup from a primary, so no need to mark it as relocated
recoveryAction.startRecovery(nodes.localNode(), node, false);
shardStateAction.shardStarted(shardRouting);
shardStateAction.shardStarted(shardRouting, "after recovery (backup) from node [" + node + "]");
} catch (IgnoreRecoveryException e) {
// that's fine, since we might be called concurrently, just ignore this
break;
@ -337,7 +338,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu
IndexShardGatewayService shardGatewayService = indexService.shardInjector(shardId).getInstance(IndexShardGatewayService.class);
try {
shardGatewayService.recover();
shardStateAction.shardStarted(shardRouting);
shardStateAction.shardStarted(shardRouting, "after recovery from gateway");
} catch (IgnoreGatewayRecoveryException e) {
// that's fine, we might be called concurrently, just ignore this, we already recovered
}
@ -345,9 +346,10 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu
// relocating primaries, recovery from the relocating shard
Node node = nodes.get(shardRouting.relocatingNodeId());
try {
// we mark the primary we are going to recover from as relocated
// we mark the primary we are going to recover from as relocated at the end of phase 3
// so operations will start moving to the new primary
recoveryAction.startRecovery(nodes.localNode(), node, true);
shardStateAction.shardStarted(shardRouting);
shardStateAction.shardStarted(shardRouting, "after recovery (primary) from node [" + node + "]");
} catch (IgnoreRecoveryException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
}
@ -363,7 +365,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu
}
}
try {
shardStateAction.shardFailed(shardRouting);
shardStateAction.shardFailed(shardRouting, "Failed to start shard, message [" + detailedMessage(e) + "]");
} catch (Exception e1) {
logger.warn("Failed to mark shard as failed after a failed start for index [" + indexService.index().name() + "] and shard id [" + shardRouting.id() + "]", e);
}

View File

@ -88,6 +88,31 @@ public class RestClusterHealthAction extends BaseRestHandler {
builder.field("activeShards", response.activeShards());
builder.field("relocatingShards", response.relocatingShards());
if (!response.validationFailures().isEmpty()) {
builder.startArray("validationFailures");
for (String validationFailure : response.validationFailures()) {
builder.string(validationFailure);
}
// if we don't print index level information, still print the index validation failures
// so we know why the status is red
if (fLevel == 0) {
for (ClusterIndexHealth indexHealth : response) {
builder.startObject(indexHealth.index());
if (!indexHealth.validationFailures().isEmpty()) {
builder.startArray("validationFailures");
for (String validationFailure : indexHealth.validationFailures()) {
builder.string(validationFailure);
}
builder.endArray();
}
builder.endObject();
}
}
builder.endArray();
}
if (fLevel > 0) {
builder.startObject("indices");
for (ClusterIndexHealth indexHealth : response) {
@ -100,6 +125,14 @@ public class RestClusterHealthAction extends BaseRestHandler {
builder.field("activeShards", indexHealth.activeShards());
builder.field("relocatingShards", indexHealth.relocatingShards());
if (!indexHealth.validationFailures().isEmpty()) {
builder.startArray("validationFailures");
for (String validationFailure : indexHealth.validationFailures()) {
builder.string(validationFailure);
}
builder.endArray();
}
if (fLevel > 1) {
builder.startObject("shards");

View File

@ -103,12 +103,12 @@ public class RestClusterStateAction extends BaseRestHandler {
// routing nodes
builder.startObject("routingNodes");
builder.startArray("unassigned");
for (ShardRouting shardRouting : state.routingNodes().unassigned()) {
for (ShardRouting shardRouting : state.readOnlyRoutingNodes().unassigned()) {
jsonShardRouting(builder, shardRouting);
}
builder.endArray();
builder.startObject("nodes");
for (RoutingNode routingNode : state.routingNodes()) {
for (RoutingNode routingNode : state.readOnlyRoutingNodes()) {
builder.startArray(routingNode.nodeId());
for (ShardRouting shardRouting : routingNode) {
jsonShardRouting(builder, shardRouting);

View File

@ -29,7 +29,7 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.dfs.CachedDfSource;
import org.elasticsearch.search.dfs.DfsPhase;

View File

@ -40,8 +40,14 @@ public class AbstractComponent {
this.componentSettings = settings.getComponentSettings(getClass());
}
public AbstractComponent(Settings settings, Class componentClass) {
this.logger = Loggers.getLogger(componentClass, settings);
public AbstractComponent(Settings settings, Class customClass) {
this.logger = Loggers.getLogger(customClass, settings);
this.settings = settings;
this.componentSettings = settings.getComponentSettings(customClass);
}
public AbstractComponent(Settings settings, Class loggerClass, Class componentClass) {
this.logger = Loggers.getLogger(loggerClass, settings);
this.settings = settings;
this.componentSettings = settings.getComponentSettings(componentClass);
}

View File

@ -31,6 +31,8 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.ram.RamStore;

View File

@ -79,11 +79,11 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
ClusterState clusterState1 = clusterService1.state();
RoutingNode routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
RoutingNode routingNodeEntry1 = clusterState1.readOnlyRoutingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(11));
clusterState1 = client("server1").admin().cluster().state(clusterState()).actionGet().state();
routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
routingNodeEntry1 = clusterState1.readOnlyRoutingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(11));
logger.info("Starting server2");
@ -102,11 +102,11 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
Thread.sleep(200);
clusterState1 = clusterService1.state();
routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
routingNodeEntry1 = clusterState1.readOnlyRoutingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(11));
ClusterState clusterState2 = clusterService2.state();
RoutingNode routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
RoutingNode routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), equalTo(11));
logger.info("Starting server3");
@ -128,15 +128,15 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
Thread.sleep(200);
clusterState1 = clusterService1.state();
routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
routingNodeEntry1 = clusterState1.readOnlyRoutingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), anyOf(equalTo(7), equalTo(8)));
clusterState2 = clusterService2.state();
routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), anyOf(equalTo(7), equalTo(8)));
ClusterState clusterState3 = clusterService3.state();
RoutingNode routingNodeEntry3 = clusterState3.routingNodes().nodesToShards().get(clusterState3.nodes().localNodeId());
RoutingNode routingNodeEntry3 = clusterState3.readOnlyRoutingNodes().nodesToShards().get(clusterState3.nodes().localNodeId());
assertThat(routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(7));
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED) + routingNodeEntry2.numberOfShardsWithState(STARTED) + routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(22));
@ -160,11 +160,11 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
Thread.sleep(200);
clusterState2 = clusterService2.state();
routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), equalTo(11));
clusterState3 = clusterService3.state();
routingNodeEntry3 = clusterState3.routingNodes().nodesToShards().get(clusterState3.nodes().localNodeId());
routingNodeEntry3 = clusterState3.readOnlyRoutingNodes().nodesToShards().get(clusterState3.nodes().localNodeId());
assertThat(routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(11));
assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED) + routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(22));
@ -176,11 +176,11 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
Thread.sleep(200);
clusterState2 = clusterService2.state();
routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
assertThat(routingNodeEntry2, nullValue());
clusterState3 = clusterService3.state();
routingNodeEntry3 = clusterState3.routingNodes().nodesToShards().get(clusterState3.nodes().localNodeId());
routingNodeEntry3 = clusterState3.readOnlyRoutingNodes().nodesToShards().get(clusterState3.nodes().localNodeId());
assertThat(routingNodeEntry3, nullValue());
}
@ -211,7 +211,7 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
assertThat(clusterHealth.activePrimaryShards(), equalTo(11));
ClusterState clusterState1 = clusterService1.state();
RoutingNode routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
RoutingNode routingNodeEntry1 = clusterState1.readOnlyRoutingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(11));
// start another server
@ -235,11 +235,11 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
ClusterService clusterService2 = ((InternalServer) server("server2")).injector().getInstance(ClusterService.class);
clusterState1 = clusterService1.state();
routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
routingNodeEntry1 = clusterState1.readOnlyRoutingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), anyOf(equalTo(6), equalTo(5)));
ClusterState clusterState2 = clusterService2.state();
RoutingNode routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
RoutingNode routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(6)));
// start another server
@ -263,15 +263,15 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
Thread.sleep(200);
clusterState1 = clusterService1.state();
routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
routingNodeEntry1 = clusterState1.readOnlyRoutingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(3)));
clusterState2 = clusterService2.state();
routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(3)));
ClusterState clusterState3 = clusterService3.state();
RoutingNode routingNodeEntry3 = clusterState3.routingNodes().nodesToShards().get(clusterState3.nodes().localNodeId());
RoutingNode routingNodeEntry3 = clusterState3.readOnlyRoutingNodes().nodesToShards().get(clusterState3.nodes().localNodeId());
assertThat(routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(3));
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED) + routingNodeEntry2.numberOfShardsWithState(STARTED) + routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(11));
@ -295,11 +295,11 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
Thread.sleep(200);
clusterState2 = clusterService2.state();
routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(6)));
clusterState3 = clusterService3.state();
routingNodeEntry3 = clusterState3.routingNodes().nodesToShards().get(clusterState3.nodes().localNodeId());
routingNodeEntry3 = clusterState3.readOnlyRoutingNodes().nodesToShards().get(clusterState3.nodes().localNodeId());
assertThat(routingNodeEntry3.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(6)));
assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED) + routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(11));
@ -310,11 +310,11 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
assertThat(deleteIndexResponse.acknowledged(), equalTo(true));
clusterState2 = clusterService2.state();
routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
routingNodeEntry2 = clusterState2.readOnlyRoutingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
assertThat(routingNodeEntry2, nullValue());
clusterState3 = clusterService3.state();
routingNodeEntry3 = clusterState3.routingNodes().nodesToShards().get(clusterState3.nodes().localNodeId());
routingNodeEntry3 = clusterState3.readOnlyRoutingNodes().nodesToShards().get(clusterState3.nodes().localNodeId());
assertThat(routingNodeEntry3, nullValue());
}

View File

@ -2,11 +2,13 @@ log4j.rootLogger=INFO, out
log4j.logger.jgroups=WARN
#log4j.logger.discovery=TRACE
#log4j.logger.cluster=TRACE
#log4j.logger.cluster.service=TRACE
#log4j.logger.cluster.action.shard=DEBUG
#log4j.logger.indices.cluster=DEBUG
#log4j.logger.index=TRACE
#log4j.logger.index.engine=DEBUG
#log4j.logger.index.shard.recovery=TRACE
#log4j.logger.index.shard.service=DEBUG
#log4j.logger.index.shard.recovery=DEBUG
#log4j.logger.index.cache=DEBUG
#log4j.logger.http=TRACE
#log4j.logger.monitor.memory=TRACE