Non-data master nodes and non-master data nodes fail to store data, closes #579.

This commit is contained in:
kimchy 2010-12-30 12:10:40 +02:00
parent e3322836b5
commit ed996c3e85
7 changed files with 89 additions and 12 deletions

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportService;
@ -65,8 +66,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
return new ClusterHealthResponse();
}
@Override protected ClusterHealthResponse masterOperation(ClusterHealthRequest request, ClusterState state) throws ElasticSearchException {
int waitFor = 4;
@Override protected ClusterHealthResponse masterOperation(ClusterHealthRequest request, ClusterState unusedState) throws ElasticSearchException {
int waitFor = 5;
if (request.waitForStatus() == null) {
waitFor--;
}
@ -79,14 +80,19 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
if (request.waitForNodes().isEmpty()) {
waitFor--;
}
if (request.indices().length == 0) { // check that they actually exists in the meta data
waitFor--;
}
if (waitFor == 0) {
// no need to wait for anything
return clusterHealth(request);
ClusterState clusterState = clusterService.state();
return clusterHealth(request, clusterState);
}
long endTime = System.currentTimeMillis() + request.timeout().millis();
while (true) {
int waitForCounter = 0;
ClusterHealthResponse response = clusterHealth(request);
ClusterState clusterState = clusterService.state();
ClusterHealthResponse response = clusterHealth(request, clusterState);
if (request.waitForStatus() != null && response.status().value() <= request.waitForStatus().value()) {
waitForCounter++;
}
@ -96,6 +102,14 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
if (request.waitForActiveShards() != -1 && response.activeShards() >= request.waitForActiveShards()) {
waitForCounter++;
}
if (request.indices().length > 0) {
try {
clusterState.metaData().concreteIndices(request.indices());
waitForCounter++;
} catch (IndexMissingException e) {
// missing indices, wait a bit more...
}
}
if (!request.waitForNodes().isEmpty()) {
if (request.waitForNodes().startsWith(">=")) {
int expected = Integer.parseInt(request.waitForNodes().substring(2));
@ -161,14 +175,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
}
}
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request) {
ClusterState clusterState = clusterService.state();
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState) {
RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), validation.failures());
response.numberOfNodes = clusterState.nodes().size();
response.numberOfDataNodes = clusterState.nodes().dataNodes().size();
for (String index : clusterState.metaData().concreteIndices(request.indices())) {
for (String index : clusterState.metaData().concreteIndicesIgnoreMissing(request.indices())) {
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index);
IndexMetaData indexMetaData = clusterState.metaData().index(index);
if (indexRoutingTable == null) {

View File

@ -85,7 +85,7 @@ public class TransportClusterStateAction extends TransportMasterNodeOperationAct
}
if (request.filteredIndices().length > 0) {
String[] indices = currentState.metaData().concreteIndices(request.filteredIndices(), true);
String[] indices = currentState.metaData().concreteIndicesIgnoreMissing(request.filteredIndices());
for (String filteredIndex : indices) {
IndexMetaData indexMetaData = currentState.metaData().index(filteredIndex);
if (indexMetaData != null) {

View File

@ -138,10 +138,20 @@ public class MetaData implements Iterable<IndexMetaData> {
return concreteAllIndices();
}
/**
* Translates the provided indices (possibly aliased) into actual indices.
*/
public String[] concreteIndices(String[] indices) throws IndexMissingException {
return concreteIndices(indices, false);
}
/**
* Translates the provided indices (possibly aliased) into actual indices.
*/
public String[] concreteIndicesIgnoreMissing(String[] indices) {
return concreteIndices(indices, true);
}
/**
* Translates the provided indices (possibly aliased) into actual indices.
*/

View File

@ -42,6 +42,10 @@ import static org.elasticsearch.common.transport.TransportAddressSerializers.*;
*/
public class DiscoveryNode implements Streamable, Serializable {
public static boolean nodeRequiresLocalStorage(Settings settings) {
return !(settings.getAsBoolean("node.client", false) || (!settings.getAsBoolean("node.data", true) && !settings.getAsBoolean("node.master", true)));
}
public static Map<String, String> buildCommonNodesAttributes(Settings settings) {
Map<String, String> attributes = Maps.newHashMap(settings.getByPrefix("node.").getAsMap());
if (attributes.containsKey("client")) {

View File

@ -22,6 +22,7 @@ package org.elasticsearch.env;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.NativeFSLockFactory;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -45,8 +46,7 @@ public class NodeEnvironment extends AbstractComponent {
@Inject public NodeEnvironment(Settings settings, Environment environment) throws IOException {
super(settings);
if (!settings.getAsBoolean("node.data", true) || settings.getAsBoolean("node.client", false) ||
!settings.getAsBoolean("node.master", true)) {
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
nodeFile = null;
lock = null;
localNodeId = -1;

View File

@ -354,7 +354,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
initialized = true;
// if this is not a possible master node or data node, bail, we won't save anything here...
if (!clusterService.localNode().masterNode() || !clusterService.localNode().dataNode()) {
if (!clusterService.localNode().masterNode() && !clusterService.localNode().dataNode()) {
location = null;
} else {
// create the location where the state will be stored

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.integration.AbstractNodesTests;
@ -52,7 +53,9 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
if (node("node" + i) != null) {
node("node" + i).stop();
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset();
if (((InternalNode) node("node" + i)).injector().getInstance(NodeEnvironment.class).hasNodeFile()) {
((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset();
}
}
}
closeAllNodes();
@ -221,4 +224,51 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
logger.info("--> indexing a simple document");
client("node1").prepareIndex("test", "type1", "2").setSource("field1", "value1").execute().actionGet();
}
@Test public void testJustMasterNode() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting 1 master node non data");
startNode("node1", settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
logger.info("--> create an index");
client("node1").admin().indices().prepareCreate("test").execute().actionGet();
logger.info("--> closing master node");
closeNode("node1");
logger.info("--> starting 1 master node non data again");
startNode("node1", settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
logger.info("--> waiting for test index to be created");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setIndices("test").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
logger.info("--> verify we have an index");
ClusterStateResponse clusterStateResponse = client("node1").admin().cluster().prepareState().setFilterIndices("test").execute().actionGet();
assertThat(clusterStateResponse.state().metaData().hasIndex("test"), equalTo(true));
}
@Test public void testJustMasterNodeAndJustDataNode() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting 1 master node non data");
startNode("node1", settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
startNode("node2", settingsBuilder().put("node.master", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
logger.info("--> create an index");
client("node1").admin().indices().prepareCreate("test").execute().actionGet();
logger.info("--> waiting for test index to be created");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setIndices("test").setWaitForYellowStatus().execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
client("node1").prepareIndex("test", "type1").setSource("field1", "value1").setTimeout("100ms").execute().actionGet();
}
}