Merge branch 'master' into feature/aggs_2_0

This commit is contained in:
Colin Goodheart-Smithe 2015-04-29 16:10:19 +01:00
commit 88aa8934a3
4 changed files with 55 additions and 743 deletions

View File

@ -99,3 +99,22 @@ By default, `BulkProcessor`:
* does not set flushInterval
* sets concurrentRequests to 1
When all documents are loaded to the `BulkProcessor` it can be closed by using `awaitClose` or `close` methods:
[source,java]
--------------------------------------------------
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
--------------------------------------------------
or
[source,java]
--------------------------------------------------
bulkProcessor.close();
--------------------------------------------------
Both methods flush any remaining documents and disable all other scheduled flushes if they were scheduled by setting
`flushInterval`. If concurrent requests were enabled the `awaitClose` method waits for up to the specified timeout for
all bulk requests to complete then returns `true`, if the specified waiting time elapses before all bulk requests complete,
`false` is returned. The `close` method doesn't wait for any remaining bulk requests to complete and exists immediately.

View File

@ -19,7 +19,6 @@
package org.elasticsearch.gateway;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -28,7 +27,9 @@ import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.DjbHashFunction;
import org.elasticsearch.cluster.routing.HashFunction;
import org.elasticsearch.cluster.routing.SimpleHashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -42,7 +43,6 @@ import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
/**
*
@ -57,9 +57,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
private final DanglingIndicesState danglingIndicesState;
@Nullable
private volatile MetaData previousMetaData;
private volatile ImmutableSet<String> previouslyWrittenIndices = ImmutableSet.of();
private volatile MetaData currentMetaData;
@Inject
public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService,
@ -78,7 +76,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
nodeEnv.ensureAtomicMoveSupported();
}
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
if (DiscoveryNode.masterNode(settings)) {
try {
ensureNoPre019State();
pre20Upgrade();
@ -98,12 +96,10 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
@Override
public void clusterChanged(ClusterChangedEvent event) {
Set<String> relevantIndices = new HashSet<>();
final ClusterState state = event.state();
if (state.blocks().disableStatePersistence()) {
// reset the current metadata, we need to start fresh...
this.previousMetaData = null;
previouslyWrittenIndices= ImmutableSet.of();
this.currentMetaData = null;
return;
}
@ -111,47 +107,44 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
// we don't check if metaData changed, since we might be called several times and we need to check dangling...
boolean success = true;
// write the state if this node is a master eligible node or if it is a data node and has shards allocated on it
if (state.nodes().localNode().masterNode() || state.nodes().localNode().dataNode()) {
// only applied to master node, writing the global and index level states
if (state.nodes().localNode().masterNode()) {
// check if the global state changed?
if (previousMetaData == null || !MetaData.isGlobalStateEquals(previousMetaData, newMetaData)) {
if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, newMetaData)) {
try {
metaStateService.writeGlobalState("changed", newMetaData);
// we determine if or if not we write meta data on data only nodes by looking at the shard routing
// and only write if a shard of this index is allocated on this node
// however, closed indices do not appear in the shard routing. if the meta data for a closed index is
// updated it will therefore not be written in case the list of previouslyWrittenIndices is empty (because state
// persistence was disabled or the node was restarted), see getRelevantIndicesOnDataOnlyNode().
// we therefore have to check here if we have shards on disk and add their indices to the previouslyWrittenIndices list
if (isDataOnlyNode(state)) {
ImmutableSet.Builder<String> previouslyWrittenIndicesBuilder = ImmutableSet.builder();
for (IndexMetaData indexMetaData : newMetaData) {
IndexMetaData indexMetaDataOnDisk = null;
if (indexMetaData.state().equals(IndexMetaData.State.CLOSE)) {
try {
indexMetaDataOnDisk = metaStateService.loadIndexState(indexMetaData.index());
} catch (IOException ex) {
throw new ElasticsearchException("failed to load index state", ex);
}
}
if (indexMetaDataOnDisk != null) {
previouslyWrittenIndicesBuilder.add(indexMetaDataOnDisk.index());
}
}
previouslyWrittenIndices = previouslyWrittenIndicesBuilder.addAll(previouslyWrittenIndices).build();
}
} catch (Throwable e) {
success = false;
}
}
Iterable<IndexMetaWriteInfo> writeInfo;
relevantIndices = getRelevantIndices(event.state(), previouslyWrittenIndices);
writeInfo = resolveStatesToBeWritten(previouslyWrittenIndices, relevantIndices, previousMetaData, event.state().metaData());
// check and write changes in indices
for (IndexMetaWriteInfo indexMetaWrite : writeInfo) {
for (IndexMetaData indexMetaData : newMetaData) {
String writeReason = null;
IndexMetaData currentIndexMetaData;
if (currentMetaData == null) {
// a new event..., check from the state stored
try {
metaStateService.writeIndex(indexMetaWrite.reason, indexMetaWrite.newMetaData, indexMetaWrite.previousMetaData);
currentIndexMetaData = metaStateService.loadIndexState(indexMetaData.index());
} catch (IOException ex) {
throw new ElasticsearchException("failed to load index state", ex);
}
} else {
currentIndexMetaData = currentMetaData.index(indexMetaData.index());
}
if (currentIndexMetaData == null) {
writeReason = "freshly created";
} else if (currentIndexMetaData.version() != indexMetaData.version()) {
writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]";
}
// we update the writeReason only if we really need to write it
if (writeReason == null) {
continue;
}
try {
metaStateService.writeIndex(writeReason, indexMetaData, currentIndexMetaData);
} catch (Throwable e) {
success = false;
}
@ -161,29 +154,10 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
danglingIndicesState.processDanglingIndices(newMetaData);
if (success) {
previousMetaData = newMetaData;
ImmutableSet.Builder<String> builder= ImmutableSet.builder();
previouslyWrittenIndices = builder.addAll(relevantIndices).build();
currentMetaData = newMetaData;
}
}
public static Set<String> getRelevantIndices(ClusterState state, ImmutableSet<String> previouslyWrittenIndices) {
Set<String> relevantIndices;
if (isDataOnlyNode(state)) {
relevantIndices = getRelevantIndicesOnDataOnlyNode(state, previouslyWrittenIndices);
} else if (state.nodes().localNode().masterNode() == true) {
relevantIndices = getRelevantIndicesForMasterEligibleNode(state);
} else {
relevantIndices = Collections.emptySet();
}
return relevantIndices;
}
protected static boolean isDataOnlyNode(ClusterState state) {
return ((state.nodes().localNode().masterNode() == false) && state.nodes().localNode().dataNode());
}
/**
* Throws an IAE if a pre 0.19 state is detected
*/
@ -277,82 +251,4 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
}
}
}
/**
* Loads the current meta state for each index in the new cluster state and checks if it has to be persisted.
* Each index state that should be written to disk will be returned. This is only run for data only nodes.
* It will return only the states for indices that actually have a shard allocated on the current node.
*
* @param previouslyWrittenIndices A list of indices for which the state was already written before
* @param potentiallyUnwrittenIndices The list of indices for which state should potentially be written
* @param previousMetaData The last meta data we know of. meta data for all indices in previouslyWrittenIndices list is persisted now
* @param newMetaData The new metadata
* @return iterable over all indices states that should be written to disk
*/
public static Iterable<GatewayMetaState.IndexMetaWriteInfo> resolveStatesToBeWritten(ImmutableSet<String> previouslyWrittenIndices, Set<String> potentiallyUnwrittenIndices, MetaData previousMetaData, MetaData newMetaData) {
List<GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new ArrayList<>();
for (String index : potentiallyUnwrittenIndices) {
IndexMetaData newIndexMetaData = newMetaData.index(index);
IndexMetaData previousIndexMetaData = previousMetaData == null ? null : previousMetaData.index(index);
String writeReason = null;
if (previouslyWrittenIndices.contains(index) == false || previousIndexMetaData == null) {
writeReason = "freshly created";
} else if (previousIndexMetaData.version() != newIndexMetaData.version()) {
writeReason = "version changed from [" + previousIndexMetaData.version() + "] to [" + newIndexMetaData.version() + "]";
}
if (writeReason != null) {
indicesToWrite.add(new GatewayMetaState.IndexMetaWriteInfo(newIndexMetaData, previousIndexMetaData, writeReason));
}
}
return indicesToWrite;
}
public static Set<String> getRelevantIndicesOnDataOnlyNode(ClusterState state, ImmutableSet<String> previouslyWrittenIndices) {
RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().localNodeId());
if (newRoutingNode == null) {
throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state");
}
Set<String> indices = new HashSet<>();
for (MutableShardRouting routing : newRoutingNode) {
indices.add(routing.index());
}
// we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if we have it written on disk previously
for (IndexMetaData indexMetaData : state.metaData()) {
if (previouslyWrittenIndices.contains(indexMetaData.getIndex()) && state.metaData().getIndices().get(indexMetaData.getIndex()).state().equals(IndexMetaData.State.CLOSE)) {
indices.add(indexMetaData.getIndex());
}
}
return indices;
}
public static Set<String> getRelevantIndicesForMasterEligibleNode(ClusterState state) {
Set<String> relevantIndices;
relevantIndices = new HashSet<>();
// we have to iterate over the metadata to make sure we also capture closed indices
for (IndexMetaData indexMetaData : state.metaData()) {
relevantIndices.add(indexMetaData.getIndex());
}
return relevantIndices;
}
public static class IndexMetaWriteInfo {
final IndexMetaData newMetaData;
final String reason;
final IndexMetaData previousMetaData;
public IndexMetaWriteInfo(IndexMetaData newMetaData, IndexMetaData previousMetaData, String reason) {
this.newMetaData = newMetaData;
this.reason = reason;
this.previousMetaData = previousMetaData;
}
public IndexMetaData getNewMetaData() {
return newMetaData;
}
public String getReason() {
return reason;
}
}
}

View File

@ -1,249 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Test;
import java.util.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
/**
* Test IndexMetaState for master and data only nodes return correct list of indices to write
* There are many parameters:
* - meta state is not in memory
* - meta state is in memory with old version/ new version
* - meta state is in memory with new version
* - version changed in cluster state event/ no change
* - node is data only node
* - node is master eligible
* for data only nodes: shard initializing on shard
*/
public class GatewayMetaStateTests extends ElasticsearchAllocationTestCase {
ClusterChangedEvent generateEvent(boolean initializing, boolean versionChanged, boolean masterEligible) {
//ridiculous settings to make sure we don't run into uninitialized because fo default
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 100)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 100)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 100)
.build());
ClusterState newClusterState, previousClusterState;
MetaData metaDataOldClusterState = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(2))
.build();
RoutingTable routingTableOldClusterState = RoutingTable.builder()
.addAsNew(metaDataOldClusterState.index("test"))
.build();
// assign all shards
ClusterState init = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaDataOldClusterState)
.routingTable(routingTableOldClusterState)
.nodes(generateDiscoveryNodes(masterEligible))
.build();
// new cluster state will have initializing shards on node 1
RoutingTable routingTableNewClusterState = strategy.reroute(init).routingTable();
if (initializing == false) {
// pretend all initialized, nothing happened
ClusterState temp = ClusterState.builder(init).routingTable(routingTableNewClusterState).metaData(metaDataOldClusterState).build();
routingTableNewClusterState = strategy.applyStartedShards(temp, temp.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable();
routingTableOldClusterState = routingTableNewClusterState;
} else {
// nothing to do, we have one routing table with unassigned and one with initializing
}
// create new meta data either with version changed or not
MetaData metaDataNewClusterState = MetaData.builder()
.put(init.metaData().index("test"), versionChanged)
.build();
// create the cluster states with meta data and routing tables as computed before
previousClusterState = ClusterState.builder(init)
.metaData(metaDataOldClusterState)
.routingTable(routingTableOldClusterState)
.nodes(generateDiscoveryNodes(masterEligible))
.build();
newClusterState = ClusterState.builder(previousClusterState).routingTable(routingTableNewClusterState).metaData(metaDataNewClusterState).version(previousClusterState.getVersion() + 1).build();
ClusterChangedEvent event = new ClusterChangedEvent("test", newClusterState, previousClusterState);
assertThat(event.state().version(), equalTo(event.previousState().version() + 1));
return event;
}
ClusterChangedEvent generateCloseEvent(boolean masterEligible) {
//ridiculous settings to make sure we don't run into uninitialized because fo default
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 100)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 100)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 100)
.build());
ClusterState newClusterState, previousClusterState;
MetaData metaDataIndexCreated = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(2))
.build();
RoutingTable routingTableIndexCreated = RoutingTable.builder()
.addAsNew(metaDataIndexCreated.index("test"))
.build();
// assign all shards
ClusterState init = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaDataIndexCreated)
.routingTable(routingTableIndexCreated)
.nodes(generateDiscoveryNodes(masterEligible))
.build();
RoutingTable routingTableInitializing = strategy.reroute(init).routingTable();
ClusterState temp = ClusterState.builder(init).routingTable(routingTableInitializing).build();
RoutingTable routingTableStarted = strategy.applyStartedShards(temp, temp.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable();
// create new meta data either with version changed or not
MetaData metaDataStarted = MetaData.builder()
.put(init.metaData().index("test"), true)
.build();
// create the cluster states with meta data and routing tables as computed before
MetaData metaDataClosed = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.CLOSE).numberOfShards(5).numberOfReplicas(2)).version(metaDataStarted.version() + 1)
.build();
previousClusterState = ClusterState.builder(init)
.metaData(metaDataStarted)
.routingTable(routingTableStarted)
.nodes(generateDiscoveryNodes(masterEligible))
.build();
newClusterState = ClusterState.builder(previousClusterState)
.routingTable(routingTableIndexCreated)
.metaData(metaDataClosed)
.version(previousClusterState.getVersion() + 1).build();
ClusterChangedEvent event = new ClusterChangedEvent("test", newClusterState, previousClusterState);
assertThat(event.state().version(), equalTo(event.previousState().version() + 1));
return event;
}
private DiscoveryNodes.Builder generateDiscoveryNodes(boolean masterEligible) {
Map<String, String> masterNodeAttributes = new HashMap<>();
masterNodeAttributes.put("master", "true");
masterNodeAttributes.put("data", "true");
Map<String, String> dataNodeAttributes = new HashMap<>();
dataNodeAttributes.put("master", "false");
dataNodeAttributes.put("data", "true");
return DiscoveryNodes.builder().put(newNode("node1", masterEligible ? masterNodeAttributes : dataNodeAttributes)).put(newNode("master_node", masterNodeAttributes)).localNodeId("node1").masterNodeId(masterEligible ? "node1" : "master_node");
}
public void assertState(ClusterChangedEvent event,
boolean stateInMemory,
boolean expectMetaData) throws Exception {
MetaData inMemoryMetaData = null;
ImmutableSet<String> oldIndicesList = ImmutableSet.of();
if (stateInMemory) {
inMemoryMetaData = event.previousState().metaData();
ImmutableSet.Builder<String> relevantIndices = ImmutableSet.builder();
oldIndicesList = relevantIndices.addAll(GatewayMetaState.getRelevantIndices(event.previousState(), oldIndicesList)).build();
}
Set<String> newIndicesList = GatewayMetaState.getRelevantIndices(event.state(), oldIndicesList);
// third, get the actual write info
Iterator<GatewayMetaState.IndexMetaWriteInfo> indices = GatewayMetaState.resolveStatesToBeWritten(oldIndicesList, newIndicesList, inMemoryMetaData, event.state().metaData()).iterator();
if (expectMetaData) {
assertThat(indices.hasNext(), equalTo(true));
assertThat(indices.next().getNewMetaData().index(), equalTo("test"));
assertThat(indices.hasNext(), equalTo(false));
} else {
assertThat(indices.hasNext(), equalTo(false));
}
}
@Test
public void testVersionChangeIsAlwaysWritten() throws Exception {
// test that version changes are always written
boolean initializing = randomBoolean();
boolean versionChanged = true;
boolean stateInMemory = randomBoolean();
boolean masterEligible = randomBoolean();
boolean expectMetaData = true;
ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible);
assertState(event, stateInMemory, expectMetaData);
}
@Test
public void testNewShardsAlwaysWritten() throws Exception {
// make sure new shards on data only node always written
boolean initializing = true;
boolean versionChanged = randomBoolean();
boolean stateInMemory = randomBoolean();
boolean masterEligible = false;
boolean expectMetaData = true;
ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible);
assertState(event, stateInMemory, expectMetaData);
}
@Test
public void testAllUpToDateNothingWritten() throws Exception {
// make sure state is not written again if we wrote already
boolean initializing = false;
boolean versionChanged = false;
boolean stateInMemory = true;
boolean masterEligible = randomBoolean();
boolean expectMetaData = false;
ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible);
assertState(event, stateInMemory, expectMetaData);
}
@Test
public void testNoWriteIfNothingChanged() throws Exception {
boolean initializing = false;
boolean versionChanged = false;
boolean stateInMemory = true;
boolean masterEligible = randomBoolean();
boolean expectMetaData = false;
ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible);
ClusterChangedEvent newEventWithNothingChanged = new ClusterChangedEvent("test cluster state", event.state(), event.state());
assertState(newEventWithNothingChanged, stateInMemory, expectMetaData);
}
@Test
public void testWriteClosedIndex() throws Exception {
// test that the closing of an index is written also on data only node
boolean masterEligible = randomBoolean();
boolean expectMetaData = true;
boolean stateInMemory = true;
ClusterChangedEvent event = generateCloseEvent(masterEligible);
assertState(event, stateInMemory, expectMetaData);
}
}

View File

@ -1,354 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.junit.Test;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import static org.elasticsearch.client.Requests.clusterHealthRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
/**
*
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class MetaDataWriteDataNodesTests extends ElasticsearchIntegrationTest {
@Test
public void testMetaWrittenAlsoOnDataNode() throws Exception {
// this test checks that index state is written on data only nodes
String masterNodeName = startMasterNode();
String redNode = startDataNode("red");
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0)));
index("test", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
waitForConcreteMappingsOnAll("test", "doc", "text");
ensureGreen("test");
assertIndexInMetaState(redNode, "test");
assertIndexInMetaState(masterNodeName, "test");
//stop master node and start again with an empty data folder
((InternalTestCluster) cluster()).stopCurrentMasterNode();
String newMasterNode = startMasterNode();
ensureGreen("test");
// wait for mapping also on master becasue then we can be sure the state was written
waitForConcreteMappingsOnAll("test", "doc", "text");
// check for meta data
assertIndexInMetaState(redNode, "test");
assertIndexInMetaState(newMasterNode, "test");
// check if index and doc is still there
ensureGreen("test");
assertTrue(client().prepareGet("test", "doc", "1").get().isExists());
}
@Test
public void testMetaWrittenOnlyForIndicesOnNodesThatHaveAShard() throws Exception {
// this test checks that the index state is only written to a data only node if they have a shard of that index allocated on the node
String masterNode = startMasterNode();
String blueNode = startDataNode("blue");
String redNode = startDataNode("red");
assertAcked(prepareCreate("blue_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "blue")));
index("blue_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
assertAcked(prepareCreate("red_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red")));
index("red_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
ensureGreen();
waitForConcreteMappingsOnAll("blue_index", "doc", "text");
waitForConcreteMappingsOnAll("red_index", "doc", "text");
assertIndexNotInMetaState(blueNode, "red_index");
assertIndexNotInMetaState(redNode, "blue_index");
assertIndexInMetaState(blueNode, "blue_index");
assertIndexInMetaState(redNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
assertIndexInMetaState(masterNode, "blue_index");
// not the index state for blue_index should only be written on blue_node and the for red_index only on red_node
// we restart red node and master but with empty data folders
stopNode(redNode);
((InternalTestCluster) cluster()).stopCurrentMasterNode();
masterNode = startMasterNode();
redNode = startDataNode("red");
ensureGreen();
assertIndexNotInMetaState(blueNode, "red_index");
assertIndexInMetaState(blueNode, "blue_index");
assertIndexNotInMetaState(redNode, "red_index");
assertIndexNotInMetaState(redNode, "blue_index");
assertIndexNotInMetaState(masterNode, "red_index");
assertIndexInMetaState(masterNode, "blue_index");
// check that blue index is still there
assertFalse(client().admin().indices().prepareExists("red_index").get().isExists());
assertTrue(client().prepareGet("blue_index", "doc", "1").get().isExists());
// red index should be gone
// if the blue node had stored the index state then cluster health would be red and red_index would exist
assertFalse(client().admin().indices().prepareExists("red_index").get().isExists());
}
@Test
public void testMetaIsRemovedIfAllShardsFromIndexRemoved() throws Exception {
// this test checks that the index state is removed from a data only node once all shards have been allocated away from it
String masterNode = startMasterNode();
String blueNode = startDataNode("blue");
String redNode = startDataNode("red");
// create blue_index on blue_node and same for red
client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("3")).get();
assertAcked(prepareCreate("blue_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "blue")));
index("blue_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
assertAcked(prepareCreate("red_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red")));
index("red_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
ensureGreen();
assertIndexNotInMetaState(redNode, "blue_index");
assertIndexNotInMetaState(blueNode, "red_index");
assertIndexInMetaState(redNode, "red_index");
assertIndexInMetaState(blueNode, "blue_index");
assertIndexInMetaState(masterNode, "red_index");
assertIndexInMetaState(masterNode, "blue_index");
// now relocate blue_index to red_node and red_index to blue_node
logger.debug("relocating indices...");
client().admin().indices().prepareUpdateSettings("blue_index").setSettings(ImmutableSettings.builder().put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red")).get();
client().admin().indices().prepareUpdateSettings("red_index").setSettings(ImmutableSettings.builder().put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "blue")).get();
client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).get();
ensureGreen();
assertIndexNotInMetaState(redNode, "red_index");
assertIndexNotInMetaState(blueNode, "blue_index");
assertIndexInMetaState(redNode, "blue_index");
assertIndexInMetaState(blueNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
assertIndexInMetaState(masterNode, "blue_index");
waitForConcreteMappingsOnAll("blue_index", "doc", "text");
waitForConcreteMappingsOnAll("red_index", "doc", "text");
//at this point the blue_index is on red node and the red_index on blue node
// now, when we start red and master node again but without data folder, the red index should be gone but the blue index should initialize fine
stopNode(redNode);
((InternalTestCluster) cluster()).stopCurrentMasterNode();
masterNode = startMasterNode();
redNode = startDataNode("red");
ensureGreen();
assertIndexNotInMetaState(redNode, "blue_index");
assertIndexNotInMetaState(blueNode, "blue_index");
assertIndexNotInMetaState(redNode, "red_index");
assertIndexInMetaState(blueNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
assertIndexNotInMetaState(masterNode, "blue_index");
assertTrue(client().prepareGet("red_index", "doc", "1").get().isExists());
// if the red_node had stored the index state then cluster health would be red and blue_index would exist
assertFalse(client().admin().indices().prepareExists("blue_index").get().isExists());
}
@Test
public void testMetaWrittenWhenIndexIsClosed() throws Exception {
String masterNode = startMasterNode();
String redNodeDataPath = createTempDir().toString();
String redNode = startDataNode("red", redNodeDataPath);
String blueNode = startDataNode("blue");
// create red_index on red_node and same for red
client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("3")).get();
assertAcked(prepareCreate("red_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red")));
index("red_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
ensureGreen();
assertIndexNotInMetaState(blueNode, "red_index");
assertIndexInMetaState(redNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
waitForConcreteMappingsOnAll("red_index", "doc", "text");
client().admin().indices().prepareClose("red_index").get();
// close the index
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.CLOSE.name()));
// restart master with empty data folder and maybe red node
boolean restartRedNode = randomBoolean();
//at this point the red_index on red node
if (restartRedNode) {
stopNode(redNode);
}
((InternalTestCluster) cluster()).stopCurrentMasterNode();
masterNode = startMasterNode();
if (restartRedNode) {
redNode = startDataNode("red", redNodeDataPath);
}
ensureGreen("red_index");
assertIndexNotInMetaState(blueNode, "red_index");
assertIndexInMetaState(redNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
clusterStateResponse = client().admin().cluster().prepareState().get();
assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.CLOSE.name()));
// open the index again
client().admin().indices().prepareOpen("red_index").get();
clusterStateResponse = client().admin().cluster().prepareState().get();
assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.OPEN.name()));
// restart again
ensureGreen();
if (restartRedNode) {
stopNode(redNode);
}
((InternalTestCluster) cluster()).stopCurrentMasterNode();
masterNode = startMasterNode();
if (restartRedNode) {
redNode = startDataNode("red", redNodeDataPath);
}
ensureGreen("red_index");
assertIndexNotInMetaState(blueNode, "red_index");
assertIndexInMetaState(redNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
clusterStateResponse = client().admin().cluster().prepareState().get();
assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.OPEN.name()));
assertTrue(client().prepareGet("red_index", "doc", "1").get().isExists());
}
@Test
public void testMetaWrittenWhenIndexIsClosedAndMetaUpdated() throws Exception {
String masterNode = startMasterNode();
String redNodeDataPath = createTempDir().toString();
String redNode = startDataNode("red", redNodeDataPath);
// create red_index on red_node and same for red
client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("2")).get();
assertAcked(prepareCreate("red_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red")));
index("red_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
logger.info("--> wait for green red_index");
ensureGreen();
logger.info("--> wait for meta state written for red_index");
assertIndexInMetaState(redNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
waitForConcreteMappingsOnAll("red_index", "doc", "text");
logger.info("--> close red_index");
client().admin().indices().prepareClose("red_index").get();
// close the index
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.CLOSE.name()));
logger.info("--> restart red node");
stopNode(redNode);
redNode = startDataNode("red", redNodeDataPath);
client().admin().indices().preparePutMapping("red_index").setType("doc").setSource(jsonBuilder().startObject()
.startObject("properties")
.startObject("integer_field")
.field("type", "integer")
.endObject()
.endObject()
.endObject()).get();
GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings("red_index").addTypes("doc").get();
assertNotNull(((LinkedHashMap)(getMappingsResponse.getMappings().get("red_index").get("doc").getSourceAsMap().get("properties"))).get("integer_field"));
// restart master with empty data folder and maybe red node
((InternalTestCluster) cluster()).stopCurrentMasterNode();
masterNode = startMasterNode();
ensureGreen("red_index");
assertIndexInMetaState(redNode, "red_index");
assertIndexInMetaState(masterNode, "red_index");
clusterStateResponse = client().admin().cluster().prepareState().get();
assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.CLOSE.name()));
getMappingsResponse = client().admin().indices().prepareGetMappings("red_index").addTypes("doc").get();
assertNotNull(((LinkedHashMap)(getMappingsResponse.getMappings().get("red_index").get("doc").getSourceAsMap().get("properties"))).get("integer_field"));
}
private String startDataNode(String color) {
return startDataNode(color, createTempDir().toString());
}
private String startDataNode(String color, String newDataPath) {
ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder()
.put("node.data", true)
.put("node.master", false)
.put("node.color", color)
.put("path.data", newDataPath);
return internalCluster().startNode(settingsBuilder.build());
}
private String startMasterNode() {
ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder()
.put("node.data", false)
.put("node.master", true)
.put("path.data", createTempDir().toString());
return internalCluster().startNode(settingsBuilder.build());
}
private void stopNode(String name) throws IOException {
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(name));
}
protected void assertIndexNotInMetaState(String nodeName, String indexName) throws Exception {
assertMetaState(nodeName, indexName, false);
}
protected void assertIndexInMetaState(String nodeName, String indexName) throws Exception {
assertMetaState(nodeName, indexName, true);
}
private void assertMetaState(final String nodeName, final String indexName, final boolean shouldBe) throws Exception {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
logger.info("checking if meta state exists...");
return shouldBe == metaStateExists(nodeName, indexName);
}
});
boolean inMetaSate = metaStateExists(nodeName, indexName);
if (shouldBe) {
assertTrue("expected " + indexName + " in meta state of node " + nodeName, inMetaSate);
} else {
assertFalse("expected " + indexName + " to not be in meta state of node " + nodeName, inMetaSate);
}
}
private boolean metaStateExists(String nodeName, String indexName) {
GatewayMetaState redNodeMetaState = ((InternalTestCluster) cluster()).getInstance(GatewayMetaState.class, nodeName);
MetaData redNodeMetaData = null;
try {
redNodeMetaData = redNodeMetaState.loadMetaState();
} catch (Exception e) {
fail("failed to load meta state");
}
ImmutableOpenMap<String, IndexMetaData> indices = redNodeMetaData.getIndices();
boolean inMetaSate = false;
for (ObjectObjectCursor<String, IndexMetaData> index : indices) {
inMetaSate = inMetaSate || index.key.equals(indexName);
}
return inMetaSate;
}
}