Write state also on data nodes if not master eligible

When a node was a data node only then the index state was not written.
In case this node connected to a master that did not have the index
in the cluster state, for example because a master was restarted and
the data folder was lost, then the indices were not imported as dangling
but instead deleted.
This commit makes sure that index state for data nodes is also written
if they have at least one shard of this index allocated.

closes #8823
closes #9952
This commit is contained in:
Britta Weber 2015-03-02 10:51:01 +01:00
parent 7aa4c7e256
commit 4088dd38cb
3 changed files with 743 additions and 36 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.gateway;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -27,9 +28,7 @@ import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.DjbHashFunction;
import org.elasticsearch.cluster.routing.HashFunction;
import org.elasticsearch.cluster.routing.SimpleHashFunction;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -43,6 +42,7 @@ import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
/**
*
@ -57,7 +57,9 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
private final DanglingIndicesState danglingIndicesState;
@Nullable
private volatile MetaData currentMetaData;
private volatile MetaData previousMetaData;
private volatile ImmutableSet<String> previouslyWrittenIndices = ImmutableSet.of();
@Inject
public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService,
@ -76,7 +78,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
nodeEnv.ensureAtomicMoveSupported();
}
if (DiscoveryNode.masterNode(settings)) {
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
try {
ensureNoPre019State();
pre20Upgrade();
@ -96,10 +98,12 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
@Override
public void clusterChanged(ClusterChangedEvent event) {
Set<String> relevantIndices = new HashSet<>();
final ClusterState state = event.state();
if (state.blocks().disableStatePersistence()) {
// reset the current metadata, we need to start fresh...
this.currentMetaData = null;
this.previousMetaData = null;
previouslyWrittenIndices= ImmutableSet.of();
return;
}
@ -107,44 +111,47 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
// we don't check if metaData changed, since we might be called several times and we need to check dangling...
boolean success = true;
// only applied to master node, writing the global and index level states
if (state.nodes().localNode().masterNode()) {
// write the state if this node is a master eligible node or if it is a data node and has shards allocated on it
if (state.nodes().localNode().masterNode() || state.nodes().localNode().dataNode()) {
// check if the global state changed?
if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, newMetaData)) {
if (previousMetaData == null || !MetaData.isGlobalStateEquals(previousMetaData, newMetaData)) {
try {
metaStateService.writeGlobalState("changed", newMetaData);
// we determine if or if not we write meta data on data only nodes by looking at the shard routing
// and only write if a shard of this index is allocated on this node
// however, closed indices do not appear in the shard routing. if the meta data for a closed index is
// updated it will therefore not be written in case the list of previouslyWrittenIndices is empty (because state
// persistence was disabled or the node was restarted), see getRelevantIndicesOnDataOnlyNode().
// we therefore have to check here if we have shards on disk and add their indices to the previouslyWrittenIndices list
if (isDataOnlyNode(state)) {
ImmutableSet.Builder<String> previouslyWrittenIndicesBuilder = ImmutableSet.builder();
for (IndexMetaData indexMetaData : newMetaData) {
IndexMetaData indexMetaDataOnDisk = null;
if (indexMetaData.state().equals(IndexMetaData.State.CLOSE)) {
try {
indexMetaDataOnDisk = metaStateService.loadIndexState(indexMetaData.index());
} catch (IOException ex) {
throw new ElasticsearchException("failed to load index state", ex);
}
}
if (indexMetaDataOnDisk != null) {
previouslyWrittenIndicesBuilder.add(indexMetaDataOnDisk.index());
}
}
previouslyWrittenIndices = previouslyWrittenIndicesBuilder.addAll(previouslyWrittenIndices).build();
}
} catch (Throwable e) {
success = false;
}
}
Iterable<IndexMetaWriteInfo> writeInfo;
relevantIndices = getRelevantIndices(event.state(), previouslyWrittenIndices);
writeInfo = resolveStatesToBeWritten(previouslyWrittenIndices, relevantIndices, previousMetaData, event.state().metaData());
// check and write changes in indices
for (IndexMetaData indexMetaData : newMetaData) {
String writeReason = null;
IndexMetaData currentIndexMetaData;
if (currentMetaData == null) {
// a new event..., check from the state stored
try {
currentIndexMetaData = metaStateService.loadIndexState(indexMetaData.index());
} catch (IOException ex) {
throw new ElasticsearchException("failed to load index state", ex);
}
} else {
currentIndexMetaData = currentMetaData.index(indexMetaData.index());
}
if (currentIndexMetaData == null) {
writeReason = "freshly created";
} else if (currentIndexMetaData.version() != indexMetaData.version()) {
writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]";
}
// we update the writeReason only if we really need to write it
if (writeReason == null) {
continue;
}
for (IndexMetaWriteInfo indexMetaWrite : writeInfo) {
try {
metaStateService.writeIndex(writeReason, indexMetaData, currentIndexMetaData);
metaStateService.writeIndex(indexMetaWrite.reason, indexMetaWrite.newMetaData, indexMetaWrite.previousMetaData);
} catch (Throwable e) {
success = false;
}
@ -154,10 +161,29 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
danglingIndicesState.processDanglingIndices(newMetaData);
if (success) {
currentMetaData = newMetaData;
previousMetaData = newMetaData;
ImmutableSet.Builder<String> builder= ImmutableSet.builder();
previouslyWrittenIndices = builder.addAll(relevantIndices).build();
}
}
public static Set<String> getRelevantIndices(ClusterState state, ImmutableSet<String> previouslyWrittenIndices) {
Set<String> relevantIndices;
if (isDataOnlyNode(state)) {
relevantIndices = getRelevantIndicesOnDataOnlyNode(state, previouslyWrittenIndices);
} else if (state.nodes().localNode().masterNode() == true) {
relevantIndices = getRelevantIndicesForMasterEligibleNode(state);
} else {
relevantIndices = Collections.emptySet();
}
return relevantIndices;
}
protected static boolean isDataOnlyNode(ClusterState state) {
return ((state.nodes().localNode().masterNode() == false) && state.nodes().localNode().dataNode());
}
/**
* Throws an IAE if a pre 0.19 state is detected
*/
@ -229,7 +255,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
}
}
}
if (hasCustomPre20HashFunction|| pre20UseType != null) {
if (hasCustomPre20HashFunction || pre20UseType != null) {
logger.warn("Settings [{}] and [{}] are deprecated. Index settings from your old indices have been updated to record the fact that they "
+ "used some custom routing logic, you can now remove these settings from your `elasticsearch.yml` file", DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, DEPRECATED_SETTING_ROUTING_USE_TYPE);
}
@ -251,4 +277,82 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
}
}
}
/**
* Loads the current meta state for each index in the new cluster state and checks if it has to be persisted.
* Each index state that should be written to disk will be returned. This is only run for data only nodes.
* It will return only the states for indices that actually have a shard allocated on the current node.
*
* @param previouslyWrittenIndices A list of indices for which the state was already written before
* @param potentiallyUnwrittenIndices The list of indices for which state should potentially be written
* @param previousMetaData The last meta data we know of. meta data for all indices in previouslyWrittenIndices list is persisted now
* @param newMetaData The new metadata
* @return iterable over all indices states that should be written to disk
*/
public static Iterable<GatewayMetaState.IndexMetaWriteInfo> resolveStatesToBeWritten(ImmutableSet<String> previouslyWrittenIndices, Set<String> potentiallyUnwrittenIndices, MetaData previousMetaData, MetaData newMetaData) {
List<GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new ArrayList<>();
for (String index : potentiallyUnwrittenIndices) {
IndexMetaData newIndexMetaData = newMetaData.index(index);
IndexMetaData previousIndexMetaData = previousMetaData == null ? null : previousMetaData.index(index);
String writeReason = null;
if (previouslyWrittenIndices.contains(index) == false || previousIndexMetaData == null) {
writeReason = "freshly created";
} else if (previousIndexMetaData.version() != newIndexMetaData.version()) {
writeReason = "version changed from [" + previousIndexMetaData.version() + "] to [" + newIndexMetaData.version() + "]";
}
if (writeReason != null) {
indicesToWrite.add(new GatewayMetaState.IndexMetaWriteInfo(newIndexMetaData, previousIndexMetaData, writeReason));
}
}
return indicesToWrite;
}
public static Set<String> getRelevantIndicesOnDataOnlyNode(ClusterState state, ImmutableSet<String> previouslyWrittenIndices) {
RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().localNodeId());
if (newRoutingNode == null) {
throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state");
}
Set<String> indices = new HashSet<>();
for (MutableShardRouting routing : newRoutingNode) {
indices.add(routing.index());
}
// we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if we have it written on disk previously
for (IndexMetaData indexMetaData : state.metaData()) {
if (previouslyWrittenIndices.contains(indexMetaData.getIndex()) && state.metaData().getIndices().get(indexMetaData.getIndex()).state().equals(IndexMetaData.State.CLOSE)) {
indices.add(indexMetaData.getIndex());
}
}
return indices;
}
public static Set<String> getRelevantIndicesForMasterEligibleNode(ClusterState state) {
Set<String> relevantIndices;
relevantIndices = new HashSet<>();
// we have to iterate over the metadata to make sure we also capture closed indices
for (IndexMetaData indexMetaData : state.metaData()) {
relevantIndices.add(indexMetaData.getIndex());
}
return relevantIndices;
}
public static class IndexMetaWriteInfo {
final IndexMetaData newMetaData;
final String reason;
final IndexMetaData previousMetaData;
public IndexMetaWriteInfo(IndexMetaData newMetaData, IndexMetaData previousMetaData, String reason) {
this.newMetaData = newMetaData;
this.reason = reason;
this.previousMetaData = previousMetaData;
}
public IndexMetaData getNewMetaData() {
return newMetaData;
}
public String getReason() {
return reason;
}
}
}

View File

@ -0,0 +1,249 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Test;
import java.util.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
/**
* Test IndexMetaState for master and data only nodes return correct list of indices to write
* There are many parameters:
* - meta state is not in memory
* - meta state is in memory with old version/ new version
* - meta state is in memory with new version
* - version changed in cluster state event/ no change
* - node is data only node
* - node is master eligible
* for data only nodes: shard initializing on shard
*/
public class GatewayMetaStateTests extends ElasticsearchAllocationTestCase {
ClusterChangedEvent generateEvent(boolean initializing, boolean versionChanged, boolean masterEligible) {
//ridiculous settings to make sure we don't run into uninitialized because fo default
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 100)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 100)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 100)
.build());
ClusterState newClusterState, previousClusterState;
MetaData metaDataOldClusterState = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(2))
.build();
RoutingTable routingTableOldClusterState = RoutingTable.builder()
.addAsNew(metaDataOldClusterState.index("test"))
.build();
// assign all shards
ClusterState init = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaDataOldClusterState)
.routingTable(routingTableOldClusterState)
.nodes(generateDiscoveryNodes(masterEligible))
.build();
// new cluster state will have initializing shards on node 1
RoutingTable routingTableNewClusterState = strategy.reroute(init).routingTable();
if (initializing == false) {
// pretend all initialized, nothing happened
ClusterState temp = ClusterState.builder(init).routingTable(routingTableNewClusterState).metaData(metaDataOldClusterState).build();
routingTableNewClusterState = strategy.applyStartedShards(temp, temp.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable();
routingTableOldClusterState = routingTableNewClusterState;
} else {
// nothing to do, we have one routing table with unassigned and one with initializing
}
// create new meta data either with version changed or not
MetaData metaDataNewClusterState = MetaData.builder()
.put(init.metaData().index("test"), versionChanged)
.build();
// create the cluster states with meta data and routing tables as computed before
previousClusterState = ClusterState.builder(init)
.metaData(metaDataOldClusterState)
.routingTable(routingTableOldClusterState)
.nodes(generateDiscoveryNodes(masterEligible))
.build();
newClusterState = ClusterState.builder(previousClusterState).routingTable(routingTableNewClusterState).metaData(metaDataNewClusterState).version(previousClusterState.getVersion() + 1).build();
ClusterChangedEvent event = new ClusterChangedEvent("test", newClusterState, previousClusterState);
assertThat(event.state().version(), equalTo(event.previousState().version() + 1));
return event;
}
ClusterChangedEvent generateCloseEvent(boolean masterEligible) {
//ridiculous settings to make sure we don't run into uninitialized because fo default
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 100)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 100)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 100)
.build());
ClusterState newClusterState, previousClusterState;
MetaData metaDataIndexCreated = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(2))
.build();
RoutingTable routingTableIndexCreated = RoutingTable.builder()
.addAsNew(metaDataIndexCreated.index("test"))
.build();
// assign all shards
ClusterState init = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaDataIndexCreated)
.routingTable(routingTableIndexCreated)
.nodes(generateDiscoveryNodes(masterEligible))
.build();
RoutingTable routingTableInitializing = strategy.reroute(init).routingTable();
ClusterState temp = ClusterState.builder(init).routingTable(routingTableInitializing).build();
RoutingTable routingTableStarted = strategy.applyStartedShards(temp, temp.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable();
// create new meta data either with version changed or not
MetaData metaDataStarted = MetaData.builder()
.put(init.metaData().index("test"), true)
.build();
// create the cluster states with meta data and routing tables as computed before
MetaData metaDataClosed = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.CLOSE).numberOfShards(5).numberOfReplicas(2)).version(metaDataStarted.version() + 1)
.build();
previousClusterState = ClusterState.builder(init)
.metaData(metaDataStarted)
.routingTable(routingTableStarted)
.nodes(generateDiscoveryNodes(masterEligible))
.build();
newClusterState = ClusterState.builder(previousClusterState)
.routingTable(routingTableIndexCreated)
.metaData(metaDataClosed)
.version(previousClusterState.getVersion() + 1).build();
ClusterChangedEvent event = new ClusterChangedEvent("test", newClusterState, previousClusterState);
assertThat(event.state().version(), equalTo(event.previousState().version() + 1));
return event;
}
private DiscoveryNodes.Builder generateDiscoveryNodes(boolean masterEligible) {
Map<String, String> masterNodeAttributes = new HashMap<>();
masterNodeAttributes.put("master", "true");
masterNodeAttributes.put("data", "true");
Map<String, String> dataNodeAttributes = new HashMap<>();
dataNodeAttributes.put("master", "false");
dataNodeAttributes.put("data", "true");
return DiscoveryNodes.builder().put(newNode("node1", masterEligible ? masterNodeAttributes : dataNodeAttributes)).put(newNode("master_node", masterNodeAttributes)).localNodeId("node1").masterNodeId(masterEligible ? "node1" : "master_node");
}
public void assertState(ClusterChangedEvent event,
boolean stateInMemory,
boolean expectMetaData) throws Exception {
MetaData inMemoryMetaData = null;
ImmutableSet<String> oldIndicesList = ImmutableSet.of();
if (stateInMemory) {
inMemoryMetaData = event.previousState().metaData();
ImmutableSet.Builder<String> relevantIndices = ImmutableSet.builder();
oldIndicesList = relevantIndices.addAll(GatewayMetaState.getRelevantIndices(event.previousState(), oldIndicesList)).build();
}
Set<String> newIndicesList = GatewayMetaState.getRelevantIndices(event.state(), oldIndicesList);
// third, get the actual write info
Iterator<GatewayMetaState.IndexMetaWriteInfo> indices = GatewayMetaState.resolveStatesToBeWritten(oldIndicesList, newIndicesList, inMemoryMetaData, event.state().metaData()).iterator();
if (expectMetaData) {
assertThat(indices.hasNext(), equalTo(true));
assertThat(indices.next().getNewMetaData().index(), equalTo("test"));
assertThat(indices.hasNext(), equalTo(false));
} else {
assertThat(indices.hasNext(), equalTo(false));
}
}
@Test
public void testVersionChangeIsAlwaysWritten() throws Exception {
// test that version changes are always written
boolean initializing = randomBoolean();
boolean versionChanged = true;
boolean stateInMemory = randomBoolean();
boolean masterEligible = randomBoolean();
boolean expectMetaData = true;
ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible);
assertState(event, stateInMemory, expectMetaData);
}
@Test
public void testNewShardsAlwaysWritten() throws Exception {
// make sure new shards on data only node always written
boolean initializing = true;
boolean versionChanged = randomBoolean();
boolean stateInMemory = randomBoolean();
boolean masterEligible = false;
boolean expectMetaData = true;
ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible);
assertState(event, stateInMemory, expectMetaData);
}
@Test
public void testAllUpToDateNothingWritten() throws Exception {
// make sure state is not written again if we wrote already
boolean initializing = false;
boolean versionChanged = false;
boolean stateInMemory = true;
boolean masterEligible = randomBoolean();
boolean expectMetaData = false;
ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible);
assertState(event, stateInMemory, expectMetaData);
}
@Test
public void testNoWriteIfNothingChanged() throws Exception {
boolean initializing = false;
boolean versionChanged = false;
boolean stateInMemory = true;
boolean masterEligible = randomBoolean();
boolean expectMetaData = false;
ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible);
ClusterChangedEvent newEventWithNothingChanged = new ClusterChangedEvent("test cluster state", event.state(), event.state());
assertState(newEventWithNothingChanged, stateInMemory, expectMetaData);
}
@Test
public void testWriteClosedIndex() throws Exception {
// test that the closing of an index is written also on data only node
boolean masterEligible = randomBoolean();
boolean expectMetaData = true;
boolean stateInMemory = true;
ClusterChangedEvent event = generateCloseEvent(masterEligible);
assertState(event, stateInMemory, expectMetaData);
}
}

View File

@ -0,0 +1,354 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.junit.Test;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import static org.elasticsearch.client.Requests.clusterHealthRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
/**
*
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class MetaDataWriteDataNodesTests extends ElasticsearchIntegrationTest {
@Test
public void testMetaWrittenAlsoOnDataNode() throws Exception {
// this test checks that index state is written on data only nodes
String masterNodeName = startMasterNode();
String redNode = startDataNode("red");
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0)));
index("test", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
waitForConcreteMappingsOnAll("test", "doc", "text");
ensureGreen("test");
assertIndexInMetaState(redNode, "test");
assertIndexInMetaState(masterNodeName, "test");
//stop master node and start again with an empty data folder
((InternalTestCluster) cluster()).stopCurrentMasterNode();
String newMasterNode = startMasterNode();
ensureGreen("test");
// wait for mapping also on master becasue then we can be sure the state was written
waitForConcreteMappingsOnAll("test", "doc", "text");
// check for meta data
assertIndexInMetaState(redNode, "test");
assertIndexInMetaState(newMasterNode, "test");
// check if index and doc is still there
ensureGreen("test");
assertTrue(client().prepareGet("test", "doc", "1").get().isExists());
}
@Test
public void testMetaWrittenOnlyForIndicesOnNodesThatHaveAShard() throws Exception {
// this test checks that the index state is only written to a data only node if they have a shard of that index allocated on the node
String masterNode = startMasterNode();
String blueNode = startDataNode("blue");
String redNode = startDataNode("red");
assertAcked(prepareCreate("blue_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "blue")));
index("blue_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
assertAcked(prepareCreate("red_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red")));
index("red_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
ensureGreen();
waitForConcreteMappingsOnAll("blue_index", "doc", "text");
waitForConcreteMappingsOnAll("red_index", "doc", "text");
assertIndexNotInMetaState(blueNode, "red_index");
assertIndexNotInMetaState(redNode, "blue_index");
assertIndexInMetaState(blueNode, "blue_index");
assertIndexInMetaState(redNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
assertIndexInMetaState(masterNode, "blue_index");
// not the index state for blue_index should only be written on blue_node and the for red_index only on red_node
// we restart red node and master but with empty data folders
stopNode(redNode);
((InternalTestCluster) cluster()).stopCurrentMasterNode();
masterNode = startMasterNode();
redNode = startDataNode("red");
ensureGreen();
assertIndexNotInMetaState(blueNode, "red_index");
assertIndexInMetaState(blueNode, "blue_index");
assertIndexNotInMetaState(redNode, "red_index");
assertIndexNotInMetaState(redNode, "blue_index");
assertIndexNotInMetaState(masterNode, "red_index");
assertIndexInMetaState(masterNode, "blue_index");
// check that blue index is still there
assertFalse(client().admin().indices().prepareExists("red_index").get().isExists());
assertTrue(client().prepareGet("blue_index", "doc", "1").get().isExists());
// red index should be gone
// if the blue node had stored the index state then cluster health would be red and red_index would exist
assertFalse(client().admin().indices().prepareExists("red_index").get().isExists());
}
@Test
public void testMetaIsRemovedIfAllShardsFromIndexRemoved() throws Exception {
// this test checks that the index state is removed from a data only node once all shards have been allocated away from it
String masterNode = startMasterNode();
String blueNode = startDataNode("blue");
String redNode = startDataNode("red");
// create blue_index on blue_node and same for red
client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("3")).get();
assertAcked(prepareCreate("blue_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "blue")));
index("blue_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
assertAcked(prepareCreate("red_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red")));
index("red_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
ensureGreen();
assertIndexNotInMetaState(redNode, "blue_index");
assertIndexNotInMetaState(blueNode, "red_index");
assertIndexInMetaState(redNode, "red_index");
assertIndexInMetaState(blueNode, "blue_index");
assertIndexInMetaState(masterNode, "red_index");
assertIndexInMetaState(masterNode, "blue_index");
// now relocate blue_index to red_node and red_index to blue_node
logger.debug("relocating indices...");
client().admin().indices().prepareUpdateSettings("blue_index").setSettings(ImmutableSettings.builder().put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red")).get();
client().admin().indices().prepareUpdateSettings("red_index").setSettings(ImmutableSettings.builder().put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "blue")).get();
client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).get();
ensureGreen();
assertIndexNotInMetaState(redNode, "red_index");
assertIndexNotInMetaState(blueNode, "blue_index");
assertIndexInMetaState(redNode, "blue_index");
assertIndexInMetaState(blueNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
assertIndexInMetaState(masterNode, "blue_index");
waitForConcreteMappingsOnAll("blue_index", "doc", "text");
waitForConcreteMappingsOnAll("red_index", "doc", "text");
//at this point the blue_index is on red node and the red_index on blue node
// now, when we start red and master node again but without data folder, the red index should be gone but the blue index should initialize fine
stopNode(redNode);
((InternalTestCluster) cluster()).stopCurrentMasterNode();
masterNode = startMasterNode();
redNode = startDataNode("red");
ensureGreen();
assertIndexNotInMetaState(redNode, "blue_index");
assertIndexNotInMetaState(blueNode, "blue_index");
assertIndexNotInMetaState(redNode, "red_index");
assertIndexInMetaState(blueNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
assertIndexNotInMetaState(masterNode, "blue_index");
assertTrue(client().prepareGet("red_index", "doc", "1").get().isExists());
// if the red_node had stored the index state then cluster health would be red and blue_index would exist
assertFalse(client().admin().indices().prepareExists("blue_index").get().isExists());
}
@Test
public void testMetaWrittenWhenIndexIsClosed() throws Exception {
String masterNode = startMasterNode();
String redNodeDataPath = createTempDir().toString();
String redNode = startDataNode("red", redNodeDataPath);
String blueNode = startDataNode("blue");
// create red_index on red_node and same for red
client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("3")).get();
assertAcked(prepareCreate("red_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red")));
index("red_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
ensureGreen();
assertIndexNotInMetaState(blueNode, "red_index");
assertIndexInMetaState(redNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
waitForConcreteMappingsOnAll("red_index", "doc", "text");
client().admin().indices().prepareClose("red_index").get();
// close the index
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.CLOSE.name()));
// restart master with empty data folder and maybe red node
boolean restartRedNode = randomBoolean();
//at this point the red_index on red node
if (restartRedNode) {
stopNode(redNode);
}
((InternalTestCluster) cluster()).stopCurrentMasterNode();
masterNode = startMasterNode();
if (restartRedNode) {
redNode = startDataNode("red", redNodeDataPath);
}
ensureGreen("red_index");
assertIndexNotInMetaState(blueNode, "red_index");
assertIndexInMetaState(redNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
clusterStateResponse = client().admin().cluster().prepareState().get();
assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.CLOSE.name()));
// open the index again
client().admin().indices().prepareOpen("red_index").get();
clusterStateResponse = client().admin().cluster().prepareState().get();
assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.OPEN.name()));
// restart again
ensureGreen();
if (restartRedNode) {
stopNode(redNode);
}
((InternalTestCluster) cluster()).stopCurrentMasterNode();
masterNode = startMasterNode();
if (restartRedNode) {
redNode = startDataNode("red", redNodeDataPath);
}
ensureGreen("red_index");
assertIndexNotInMetaState(blueNode, "red_index");
assertIndexInMetaState(redNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
clusterStateResponse = client().admin().cluster().prepareState().get();
assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.OPEN.name()));
assertTrue(client().prepareGet("red_index", "doc", "1").get().isExists());
}
@Test
public void testMetaWrittenWhenIndexIsClosedAndMetaUpdated() throws Exception {
String masterNode = startMasterNode();
String redNodeDataPath = createTempDir().toString();
String redNode = startDataNode("red", redNodeDataPath);
// create red_index on red_node and same for red
client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("2")).get();
assertAcked(prepareCreate("red_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red")));
index("red_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
logger.info("--> wait for green red_index");
ensureGreen();
logger.info("--> wait for meta state written for red_index");
assertIndexInMetaState(redNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
waitForConcreteMappingsOnAll("red_index", "doc", "text");
logger.info("--> close red_index");
client().admin().indices().prepareClose("red_index").get();
// close the index
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.CLOSE.name()));
logger.info("--> restart red node");
stopNode(redNode);
redNode = startDataNode("red", redNodeDataPath);
client().admin().indices().preparePutMapping("red_index").setType("doc").setSource(jsonBuilder().startObject()
.startObject("properties")
.startObject("integer_field")
.field("type", "integer")
.endObject()
.endObject()
.endObject()).get();
GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings("red_index").addTypes("doc").get();
assertNotNull(((LinkedHashMap)(getMappingsResponse.getMappings().get("red_index").get("doc").getSourceAsMap().get("properties"))).get("integer_field"));
// restart master with empty data folder and maybe red node
((InternalTestCluster) cluster()).stopCurrentMasterNode();
masterNode = startMasterNode();
ensureGreen("red_index");
assertIndexInMetaState(redNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
clusterStateResponse = client().admin().cluster().prepareState().get();
assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.CLOSE.name()));
getMappingsResponse = client().admin().indices().prepareGetMappings("red_index").addTypes("doc").get();
assertNotNull(((LinkedHashMap)(getMappingsResponse.getMappings().get("red_index").get("doc").getSourceAsMap().get("properties"))).get("integer_field"));
}
private String startDataNode(String color) {
return startDataNode(color, createTempDir().toString());
}
private String startDataNode(String color, String newDataPath) {
ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder()
.put("node.data", true)
.put("node.master", false)
.put("node.color", color)
.put("path.data", newDataPath);
return internalCluster().startNode(settingsBuilder.build());
}
private String startMasterNode() {
ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder()
.put("node.data", false)
.put("node.master", true)
.put("path.data", createTempDir().toString());
return internalCluster().startNode(settingsBuilder.build());
}
private void stopNode(String name) throws IOException {
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(name));
}
protected void assertIndexNotInMetaState(String nodeName, String indexName) throws Exception {
assertMetaState(nodeName, indexName, false);
}
protected void assertIndexInMetaState(String nodeName, String indexName) throws Exception {
assertMetaState(nodeName, indexName, true);
}
private void assertMetaState(final String nodeName, final String indexName, final boolean shouldBe) throws Exception {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
logger.info("checking if meta state exists...");
return shouldBe == metaStateExists(nodeName, indexName);
}
});
boolean inMetaSate = metaStateExists(nodeName, indexName);
if (shouldBe) {
assertTrue("expected " + indexName + " in meta state of node " + nodeName, inMetaSate);
} else {
assertFalse("expected " + indexName + " to not be in meta state of node " + nodeName, inMetaSate);
}
}
private boolean metaStateExists(String nodeName, String indexName) {
GatewayMetaState redNodeMetaState = ((InternalTestCluster) cluster()).getInstance(GatewayMetaState.class, nodeName);
MetaData redNodeMetaData = null;
try {
redNodeMetaData = redNodeMetaState.loadMetaState();
} catch (Exception e) {
fail("failed to load meta state");
}
ImmutableOpenMap<String, IndexMetaData> indices = redNodeMetaData.getIndices();
boolean inMetaSate = false;
for (ObjectObjectCursor<String, IndexMetaData> index : indices) {
inMetaSate = inMetaSate || index.key.equals(indexName);
}
return inMetaSate;
}
}