From 9b76be92b3f1f9ea735ee9009661379a24065294 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Wed, 29 Apr 2015 10:53:16 -0400 Subject: [PATCH 1/2] Docs: add notes about using close and awaitClose with bulk processor Closes #10839 --- docs/java-api/bulk.asciidoc | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/docs/java-api/bulk.asciidoc b/docs/java-api/bulk.asciidoc index 9ac61f47f30..96b0b2eb6dc 100644 --- a/docs/java-api/bulk.asciidoc +++ b/docs/java-api/bulk.asciidoc @@ -99,3 +99,22 @@ By default, `BulkProcessor`: * does not set flushInterval * sets concurrentRequests to 1 +When all documents are loaded to the `BulkProcessor` it can be closed by using `awaitClose` or `close` methods: + +[source,java] +-------------------------------------------------- +bulkProcessor.awaitClose(10, TimeUnit.MINUTES); +-------------------------------------------------- + +or + +[source,java] +-------------------------------------------------- +bulkProcessor.close(); +-------------------------------------------------- + +Both methods flush any remaining documents and disable all other scheduled flushes if they were scheduled by setting +`flushInterval`. If concurrent requests were enabled the `awaitClose` method waits for up to the specified timeout for +all bulk requests to complete then returns `true`, if the specified waiting time elapses before all bulk requests complete, +`false` is returned. The `close` method doesn't wait for any remaining bulk requests to complete and exists immediately. + From a202c2a43489aa83c10934e68f1981265ebbb3c6 Mon Sep 17 00:00:00 2001 From: Britta Weber Date: Wed, 29 Apr 2015 17:06:43 +0200 Subject: [PATCH 2/2] Revert "Write state also on data nodes if not master eligible" This reverts commit 4088dd38cbff19462e610db853ba1e54ee9785e4. --- .../gateway/GatewayMetaState.java | 176 ++------- .../gateway/GatewayMetaStateTests.java | 249 ------------ .../gateway/MetaDataWriteDataNodesTests.java | 354 ------------------ 3 files changed, 36 insertions(+), 743 deletions(-) delete mode 100644 src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java delete mode 100644 src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesTests.java diff --git a/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index ca8edebc571..158a3df5d91 100644 --- a/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -19,7 +19,6 @@ package org.elasticsearch.gateway; -import com.google.common.collect.ImmutableSet; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -28,7 +27,9 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.DjbHashFunction; +import org.elasticsearch.cluster.routing.HashFunction; +import org.elasticsearch.cluster.routing.SimpleHashFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -42,7 +43,6 @@ import java.io.IOException; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; -import java.util.*; /** * @@ -57,9 +57,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL private final DanglingIndicesState danglingIndicesState; @Nullable - private volatile MetaData previousMetaData; - - private volatile ImmutableSet previouslyWrittenIndices = ImmutableSet.of(); + private volatile MetaData currentMetaData; @Inject public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService, @@ -78,7 +76,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) { nodeEnv.ensureAtomicMoveSupported(); } - if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) { + if (DiscoveryNode.masterNode(settings)) { try { ensureNoPre019State(); pre20Upgrade(); @@ -98,12 +96,10 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL @Override public void clusterChanged(ClusterChangedEvent event) { - Set relevantIndices = new HashSet<>(); final ClusterState state = event.state(); if (state.blocks().disableStatePersistence()) { // reset the current metadata, we need to start fresh... - this.previousMetaData = null; - previouslyWrittenIndices= ImmutableSet.of(); + this.currentMetaData = null; return; } @@ -111,47 +107,44 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL // we don't check if metaData changed, since we might be called several times and we need to check dangling... boolean success = true; - // write the state if this node is a master eligible node or if it is a data node and has shards allocated on it - if (state.nodes().localNode().masterNode() || state.nodes().localNode().dataNode()) { + // only applied to master node, writing the global and index level states + if (state.nodes().localNode().masterNode()) { // check if the global state changed? - if (previousMetaData == null || !MetaData.isGlobalStateEquals(previousMetaData, newMetaData)) { + if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, newMetaData)) { try { metaStateService.writeGlobalState("changed", newMetaData); - // we determine if or if not we write meta data on data only nodes by looking at the shard routing - // and only write if a shard of this index is allocated on this node - // however, closed indices do not appear in the shard routing. if the meta data for a closed index is - // updated it will therefore not be written in case the list of previouslyWrittenIndices is empty (because state - // persistence was disabled or the node was restarted), see getRelevantIndicesOnDataOnlyNode(). - // we therefore have to check here if we have shards on disk and add their indices to the previouslyWrittenIndices list - if (isDataOnlyNode(state)) { - ImmutableSet.Builder previouslyWrittenIndicesBuilder = ImmutableSet.builder(); - for (IndexMetaData indexMetaData : newMetaData) { - IndexMetaData indexMetaDataOnDisk = null; - if (indexMetaData.state().equals(IndexMetaData.State.CLOSE)) { - try { - indexMetaDataOnDisk = metaStateService.loadIndexState(indexMetaData.index()); - } catch (IOException ex) { - throw new ElasticsearchException("failed to load index state", ex); - } - } - if (indexMetaDataOnDisk != null) { - previouslyWrittenIndicesBuilder.add(indexMetaDataOnDisk.index()); - } - } - previouslyWrittenIndices = previouslyWrittenIndicesBuilder.addAll(previouslyWrittenIndices).build(); - } } catch (Throwable e) { success = false; } } - Iterable writeInfo; - relevantIndices = getRelevantIndices(event.state(), previouslyWrittenIndices); - writeInfo = resolveStatesToBeWritten(previouslyWrittenIndices, relevantIndices, previousMetaData, event.state().metaData()); // check and write changes in indices - for (IndexMetaWriteInfo indexMetaWrite : writeInfo) { + for (IndexMetaData indexMetaData : newMetaData) { + String writeReason = null; + IndexMetaData currentIndexMetaData; + if (currentMetaData == null) { + // a new event..., check from the state stored + try { + currentIndexMetaData = metaStateService.loadIndexState(indexMetaData.index()); + } catch (IOException ex) { + throw new ElasticsearchException("failed to load index state", ex); + } + } else { + currentIndexMetaData = currentMetaData.index(indexMetaData.index()); + } + if (currentIndexMetaData == null) { + writeReason = "freshly created"; + } else if (currentIndexMetaData.version() != indexMetaData.version()) { + writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]"; + } + + // we update the writeReason only if we really need to write it + if (writeReason == null) { + continue; + } + try { - metaStateService.writeIndex(indexMetaWrite.reason, indexMetaWrite.newMetaData, indexMetaWrite.previousMetaData); + metaStateService.writeIndex(writeReason, indexMetaData, currentIndexMetaData); } catch (Throwable e) { success = false; } @@ -161,29 +154,10 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL danglingIndicesState.processDanglingIndices(newMetaData); if (success) { - previousMetaData = newMetaData; - ImmutableSet.Builder builder= ImmutableSet.builder(); - previouslyWrittenIndices = builder.addAll(relevantIndices).build(); + currentMetaData = newMetaData; } } - public static Set getRelevantIndices(ClusterState state, ImmutableSet previouslyWrittenIndices) { - Set relevantIndices; - if (isDataOnlyNode(state)) { - relevantIndices = getRelevantIndicesOnDataOnlyNode(state, previouslyWrittenIndices); - } else if (state.nodes().localNode().masterNode() == true) { - relevantIndices = getRelevantIndicesForMasterEligibleNode(state); - } else { - relevantIndices = Collections.emptySet(); - } - return relevantIndices; - } - - - protected static boolean isDataOnlyNode(ClusterState state) { - return ((state.nodes().localNode().masterNode() == false) && state.nodes().localNode().dataNode()); - } - /** * Throws an IAE if a pre 0.19 state is detected */ @@ -255,7 +229,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL } } } - if (hasCustomPre20HashFunction || pre20UseType != null) { + if (hasCustomPre20HashFunction|| pre20UseType != null) { logger.warn("Settings [{}] and [{}] are deprecated. Index settings from your old indices have been updated to record the fact that they " + "used some custom routing logic, you can now remove these settings from your `elasticsearch.yml` file", DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, DEPRECATED_SETTING_ROUTING_USE_TYPE); } @@ -277,82 +251,4 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL } } } - - /** - * Loads the current meta state for each index in the new cluster state and checks if it has to be persisted. - * Each index state that should be written to disk will be returned. This is only run for data only nodes. - * It will return only the states for indices that actually have a shard allocated on the current node. - * - * @param previouslyWrittenIndices A list of indices for which the state was already written before - * @param potentiallyUnwrittenIndices The list of indices for which state should potentially be written - * @param previousMetaData The last meta data we know of. meta data for all indices in previouslyWrittenIndices list is persisted now - * @param newMetaData The new metadata - * @return iterable over all indices states that should be written to disk - */ - public static Iterable resolveStatesToBeWritten(ImmutableSet previouslyWrittenIndices, Set potentiallyUnwrittenIndices, MetaData previousMetaData, MetaData newMetaData) { - List indicesToWrite = new ArrayList<>(); - for (String index : potentiallyUnwrittenIndices) { - IndexMetaData newIndexMetaData = newMetaData.index(index); - IndexMetaData previousIndexMetaData = previousMetaData == null ? null : previousMetaData.index(index); - String writeReason = null; - if (previouslyWrittenIndices.contains(index) == false || previousIndexMetaData == null) { - writeReason = "freshly created"; - } else if (previousIndexMetaData.version() != newIndexMetaData.version()) { - writeReason = "version changed from [" + previousIndexMetaData.version() + "] to [" + newIndexMetaData.version() + "]"; - } - if (writeReason != null) { - indicesToWrite.add(new GatewayMetaState.IndexMetaWriteInfo(newIndexMetaData, previousIndexMetaData, writeReason)); - } - } - return indicesToWrite; - } - - public static Set getRelevantIndicesOnDataOnlyNode(ClusterState state, ImmutableSet previouslyWrittenIndices) { - RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().localNodeId()); - if (newRoutingNode == null) { - throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state"); - } - Set indices = new HashSet<>(); - for (MutableShardRouting routing : newRoutingNode) { - indices.add(routing.index()); - } - // we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if we have it written on disk previously - for (IndexMetaData indexMetaData : state.metaData()) { - if (previouslyWrittenIndices.contains(indexMetaData.getIndex()) && state.metaData().getIndices().get(indexMetaData.getIndex()).state().equals(IndexMetaData.State.CLOSE)) { - indices.add(indexMetaData.getIndex()); - } - } - return indices; - } - - public static Set getRelevantIndicesForMasterEligibleNode(ClusterState state) { - Set relevantIndices; - relevantIndices = new HashSet<>(); - // we have to iterate over the metadata to make sure we also capture closed indices - for (IndexMetaData indexMetaData : state.metaData()) { - relevantIndices.add(indexMetaData.getIndex()); - } - return relevantIndices; - } - - - public static class IndexMetaWriteInfo { - final IndexMetaData newMetaData; - final String reason; - final IndexMetaData previousMetaData; - - public IndexMetaWriteInfo(IndexMetaData newMetaData, IndexMetaData previousMetaData, String reason) { - this.newMetaData = newMetaData; - this.reason = reason; - this.previousMetaData = previousMetaData; - } - - public IndexMetaData getNewMetaData() { - return newMetaData; - } - - public String getReason() { - return reason; - } - } } diff --git a/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java b/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java deleted file mode 100644 index 06b958d47aa..00000000000 --- a/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.gateway; - -import com.google.common.collect.ImmutableSet; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.allocation.AllocationService; -import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; -import org.elasticsearch.test.ElasticsearchAllocationTestCase; -import org.junit.Test; - -import java.util.*; - -import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; -import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; -import static org.hamcrest.Matchers.equalTo; - -/** - * Test IndexMetaState for master and data only nodes return correct list of indices to write - * There are many parameters: - * - meta state is not in memory - * - meta state is in memory with old version/ new version - * - meta state is in memory with new version - * - version changed in cluster state event/ no change - * - node is data only node - * - node is master eligible - * for data only nodes: shard initializing on shard - */ -public class GatewayMetaStateTests extends ElasticsearchAllocationTestCase { - - ClusterChangedEvent generateEvent(boolean initializing, boolean versionChanged, boolean masterEligible) { - //ridiculous settings to make sure we don't run into uninitialized because fo default - AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 100) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always") - .put("cluster.routing.allocation.cluster_concurrent_rebalance", 100) - .put("cluster.routing.allocation.node_initial_primaries_recoveries", 100) - .build()); - ClusterState newClusterState, previousClusterState; - MetaData metaDataOldClusterState = MetaData.builder() - .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(2)) - .build(); - - RoutingTable routingTableOldClusterState = RoutingTable.builder() - .addAsNew(metaDataOldClusterState.index("test")) - .build(); - - // assign all shards - ClusterState init = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) - .metaData(metaDataOldClusterState) - .routingTable(routingTableOldClusterState) - .nodes(generateDiscoveryNodes(masterEligible)) - .build(); - // new cluster state will have initializing shards on node 1 - RoutingTable routingTableNewClusterState = strategy.reroute(init).routingTable(); - if (initializing == false) { - // pretend all initialized, nothing happened - ClusterState temp = ClusterState.builder(init).routingTable(routingTableNewClusterState).metaData(metaDataOldClusterState).build(); - routingTableNewClusterState = strategy.applyStartedShards(temp, temp.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable(); - routingTableOldClusterState = routingTableNewClusterState; - - } else { - // nothing to do, we have one routing table with unassigned and one with initializing - } - - // create new meta data either with version changed or not - MetaData metaDataNewClusterState = MetaData.builder() - .put(init.metaData().index("test"), versionChanged) - .build(); - - - // create the cluster states with meta data and routing tables as computed before - previousClusterState = ClusterState.builder(init) - .metaData(metaDataOldClusterState) - .routingTable(routingTableOldClusterState) - .nodes(generateDiscoveryNodes(masterEligible)) - .build(); - newClusterState = ClusterState.builder(previousClusterState).routingTable(routingTableNewClusterState).metaData(metaDataNewClusterState).version(previousClusterState.getVersion() + 1).build(); - - ClusterChangedEvent event = new ClusterChangedEvent("test", newClusterState, previousClusterState); - assertThat(event.state().version(), equalTo(event.previousState().version() + 1)); - return event; - } - - ClusterChangedEvent generateCloseEvent(boolean masterEligible) { - //ridiculous settings to make sure we don't run into uninitialized because fo default - AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 100) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always") - .put("cluster.routing.allocation.cluster_concurrent_rebalance", 100) - .put("cluster.routing.allocation.node_initial_primaries_recoveries", 100) - .build()); - ClusterState newClusterState, previousClusterState; - MetaData metaDataIndexCreated = MetaData.builder() - .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(2)) - .build(); - - RoutingTable routingTableIndexCreated = RoutingTable.builder() - .addAsNew(metaDataIndexCreated.index("test")) - .build(); - - // assign all shards - ClusterState init = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) - .metaData(metaDataIndexCreated) - .routingTable(routingTableIndexCreated) - .nodes(generateDiscoveryNodes(masterEligible)) - .build(); - RoutingTable routingTableInitializing = strategy.reroute(init).routingTable(); - ClusterState temp = ClusterState.builder(init).routingTable(routingTableInitializing).build(); - RoutingTable routingTableStarted = strategy.applyStartedShards(temp, temp.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable(); - - // create new meta data either with version changed or not - MetaData metaDataStarted = MetaData.builder() - .put(init.metaData().index("test"), true) - .build(); - - // create the cluster states with meta data and routing tables as computed before - MetaData metaDataClosed = MetaData.builder() - .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.CLOSE).numberOfShards(5).numberOfReplicas(2)).version(metaDataStarted.version() + 1) - .build(); - previousClusterState = ClusterState.builder(init) - .metaData(metaDataStarted) - .routingTable(routingTableStarted) - .nodes(generateDiscoveryNodes(masterEligible)) - .build(); - newClusterState = ClusterState.builder(previousClusterState) - .routingTable(routingTableIndexCreated) - .metaData(metaDataClosed) - .version(previousClusterState.getVersion() + 1).build(); - - ClusterChangedEvent event = new ClusterChangedEvent("test", newClusterState, previousClusterState); - assertThat(event.state().version(), equalTo(event.previousState().version() + 1)); - return event; - } - - private DiscoveryNodes.Builder generateDiscoveryNodes(boolean masterEligible) { - Map masterNodeAttributes = new HashMap<>(); - masterNodeAttributes.put("master", "true"); - masterNodeAttributes.put("data", "true"); - Map dataNodeAttributes = new HashMap<>(); - dataNodeAttributes.put("master", "false"); - dataNodeAttributes.put("data", "true"); - return DiscoveryNodes.builder().put(newNode("node1", masterEligible ? masterNodeAttributes : dataNodeAttributes)).put(newNode("master_node", masterNodeAttributes)).localNodeId("node1").masterNodeId(masterEligible ? "node1" : "master_node"); - } - - public void assertState(ClusterChangedEvent event, - boolean stateInMemory, - boolean expectMetaData) throws Exception { - MetaData inMemoryMetaData = null; - ImmutableSet oldIndicesList = ImmutableSet.of(); - if (stateInMemory) { - inMemoryMetaData = event.previousState().metaData(); - ImmutableSet.Builder relevantIndices = ImmutableSet.builder(); - oldIndicesList = relevantIndices.addAll(GatewayMetaState.getRelevantIndices(event.previousState(), oldIndicesList)).build(); - } - Set newIndicesList = GatewayMetaState.getRelevantIndices(event.state(), oldIndicesList); - // third, get the actual write info - Iterator indices = GatewayMetaState.resolveStatesToBeWritten(oldIndicesList, newIndicesList, inMemoryMetaData, event.state().metaData()).iterator(); - - if (expectMetaData) { - assertThat(indices.hasNext(), equalTo(true)); - assertThat(indices.next().getNewMetaData().index(), equalTo("test")); - assertThat(indices.hasNext(), equalTo(false)); - } else { - assertThat(indices.hasNext(), equalTo(false)); - } - } - - @Test - public void testVersionChangeIsAlwaysWritten() throws Exception { - // test that version changes are always written - boolean initializing = randomBoolean(); - boolean versionChanged = true; - boolean stateInMemory = randomBoolean(); - boolean masterEligible = randomBoolean(); - boolean expectMetaData = true; - ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible); - assertState(event, stateInMemory, expectMetaData); - } - - @Test - public void testNewShardsAlwaysWritten() throws Exception { - // make sure new shards on data only node always written - boolean initializing = true; - boolean versionChanged = randomBoolean(); - boolean stateInMemory = randomBoolean(); - boolean masterEligible = false; - boolean expectMetaData = true; - ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible); - assertState(event, stateInMemory, expectMetaData); - } - - @Test - public void testAllUpToDateNothingWritten() throws Exception { - // make sure state is not written again if we wrote already - boolean initializing = false; - boolean versionChanged = false; - boolean stateInMemory = true; - boolean masterEligible = randomBoolean(); - boolean expectMetaData = false; - ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible); - assertState(event, stateInMemory, expectMetaData); - } - - @Test - public void testNoWriteIfNothingChanged() throws Exception { - boolean initializing = false; - boolean versionChanged = false; - boolean stateInMemory = true; - boolean masterEligible = randomBoolean(); - boolean expectMetaData = false; - ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible); - ClusterChangedEvent newEventWithNothingChanged = new ClusterChangedEvent("test cluster state", event.state(), event.state()); - assertState(newEventWithNothingChanged, stateInMemory, expectMetaData); - } - - @Test - public void testWriteClosedIndex() throws Exception { - // test that the closing of an index is written also on data only node - boolean masterEligible = randomBoolean(); - boolean expectMetaData = true; - boolean stateInMemory = true; - ClusterChangedEvent event = generateCloseEvent(masterEligible); - assertState(event, stateInMemory, expectMetaData); - } -} diff --git a/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesTests.java b/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesTests.java deleted file mode 100644 index 7947a6698c7..00000000000 --- a/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesTests.java +++ /dev/null @@ -1,354 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.gateway; - -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import com.google.common.base.Predicate; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; -import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.elasticsearch.test.InternalTestCluster; -import org.junit.Test; - -import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.Map; - -import static org.elasticsearch.client.Requests.clusterHealthRequest; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.equalTo; - -/** - * - */ -@ClusterScope(scope = Scope.TEST, numDataNodes = 0) -public class MetaDataWriteDataNodesTests extends ElasticsearchIntegrationTest { - - @Test - public void testMetaWrittenAlsoOnDataNode() throws Exception { - // this test checks that index state is written on data only nodes - String masterNodeName = startMasterNode(); - String redNode = startDataNode("red"); - assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0))); - index("test", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject()); - waitForConcreteMappingsOnAll("test", "doc", "text"); - ensureGreen("test"); - assertIndexInMetaState(redNode, "test"); - assertIndexInMetaState(masterNodeName, "test"); - //stop master node and start again with an empty data folder - ((InternalTestCluster) cluster()).stopCurrentMasterNode(); - String newMasterNode = startMasterNode(); - ensureGreen("test"); - // wait for mapping also on master becasue then we can be sure the state was written - waitForConcreteMappingsOnAll("test", "doc", "text"); - // check for meta data - assertIndexInMetaState(redNode, "test"); - assertIndexInMetaState(newMasterNode, "test"); - // check if index and doc is still there - ensureGreen("test"); - assertTrue(client().prepareGet("test", "doc", "1").get().isExists()); - } - - @Test - public void testMetaWrittenOnlyForIndicesOnNodesThatHaveAShard() throws Exception { - // this test checks that the index state is only written to a data only node if they have a shard of that index allocated on the node - String masterNode = startMasterNode(); - String blueNode = startDataNode("blue"); - String redNode = startDataNode("red"); - - assertAcked(prepareCreate("blue_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "blue"))); - index("blue_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject()); - assertAcked(prepareCreate("red_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red"))); - index("red_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject()); - ensureGreen(); - waitForConcreteMappingsOnAll("blue_index", "doc", "text"); - waitForConcreteMappingsOnAll("red_index", "doc", "text"); - assertIndexNotInMetaState(blueNode, "red_index"); - assertIndexNotInMetaState(redNode, "blue_index"); - assertIndexInMetaState(blueNode, "blue_index"); - assertIndexInMetaState(redNode, "red_index"); - assertIndexInMetaState(masterNode, "red_index"); - assertIndexInMetaState(masterNode, "blue_index"); - - // not the index state for blue_index should only be written on blue_node and the for red_index only on red_node - // we restart red node and master but with empty data folders - stopNode(redNode); - ((InternalTestCluster) cluster()).stopCurrentMasterNode(); - masterNode = startMasterNode(); - redNode = startDataNode("red"); - - ensureGreen(); - assertIndexNotInMetaState(blueNode, "red_index"); - assertIndexInMetaState(blueNode, "blue_index"); - assertIndexNotInMetaState(redNode, "red_index"); - assertIndexNotInMetaState(redNode, "blue_index"); - assertIndexNotInMetaState(masterNode, "red_index"); - assertIndexInMetaState(masterNode, "blue_index"); - // check that blue index is still there - assertFalse(client().admin().indices().prepareExists("red_index").get().isExists()); - assertTrue(client().prepareGet("blue_index", "doc", "1").get().isExists()); - // red index should be gone - // if the blue node had stored the index state then cluster health would be red and red_index would exist - assertFalse(client().admin().indices().prepareExists("red_index").get().isExists()); - - } - - @Test - public void testMetaIsRemovedIfAllShardsFromIndexRemoved() throws Exception { - // this test checks that the index state is removed from a data only node once all shards have been allocated away from it - String masterNode = startMasterNode(); - String blueNode = startDataNode("blue"); - String redNode = startDataNode("red"); - - // create blue_index on blue_node and same for red - client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("3")).get(); - assertAcked(prepareCreate("blue_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "blue"))); - index("blue_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject()); - assertAcked(prepareCreate("red_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red"))); - index("red_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject()); - - ensureGreen(); - assertIndexNotInMetaState(redNode, "blue_index"); - assertIndexNotInMetaState(blueNode, "red_index"); - assertIndexInMetaState(redNode, "red_index"); - assertIndexInMetaState(blueNode, "blue_index"); - assertIndexInMetaState(masterNode, "red_index"); - assertIndexInMetaState(masterNode, "blue_index"); - - // now relocate blue_index to red_node and red_index to blue_node - logger.debug("relocating indices..."); - client().admin().indices().prepareUpdateSettings("blue_index").setSettings(ImmutableSettings.builder().put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red")).get(); - client().admin().indices().prepareUpdateSettings("red_index").setSettings(ImmutableSettings.builder().put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "blue")).get(); - client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).get(); - ensureGreen(); - assertIndexNotInMetaState(redNode, "red_index"); - assertIndexNotInMetaState(blueNode, "blue_index"); - assertIndexInMetaState(redNode, "blue_index"); - assertIndexInMetaState(blueNode, "red_index"); - assertIndexInMetaState(masterNode, "red_index"); - assertIndexInMetaState(masterNode, "blue_index"); - waitForConcreteMappingsOnAll("blue_index", "doc", "text"); - waitForConcreteMappingsOnAll("red_index", "doc", "text"); - - //at this point the blue_index is on red node and the red_index on blue node - // now, when we start red and master node again but without data folder, the red index should be gone but the blue index should initialize fine - stopNode(redNode); - ((InternalTestCluster) cluster()).stopCurrentMasterNode(); - masterNode = startMasterNode(); - redNode = startDataNode("red"); - ensureGreen(); - assertIndexNotInMetaState(redNode, "blue_index"); - assertIndexNotInMetaState(blueNode, "blue_index"); - assertIndexNotInMetaState(redNode, "red_index"); - assertIndexInMetaState(blueNode, "red_index"); - assertIndexInMetaState(masterNode, "red_index"); - assertIndexNotInMetaState(masterNode, "blue_index"); - assertTrue(client().prepareGet("red_index", "doc", "1").get().isExists()); - // if the red_node had stored the index state then cluster health would be red and blue_index would exist - assertFalse(client().admin().indices().prepareExists("blue_index").get().isExists()); - } - - @Test - public void testMetaWrittenWhenIndexIsClosed() throws Exception { - String masterNode = startMasterNode(); - String redNodeDataPath = createTempDir().toString(); - String redNode = startDataNode("red", redNodeDataPath); - String blueNode = startDataNode("blue"); - // create red_index on red_node and same for red - client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("3")).get(); - assertAcked(prepareCreate("red_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red"))); - index("red_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject()); - - ensureGreen(); - assertIndexNotInMetaState(blueNode, "red_index"); - assertIndexInMetaState(redNode, "red_index"); - assertIndexInMetaState(masterNode, "red_index"); - - waitForConcreteMappingsOnAll("red_index", "doc", "text"); - client().admin().indices().prepareClose("red_index").get(); - // close the index - ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get(); - assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.CLOSE.name())); - - // restart master with empty data folder and maybe red node - boolean restartRedNode = randomBoolean(); - //at this point the red_index on red node - if (restartRedNode) { - stopNode(redNode); - } - ((InternalTestCluster) cluster()).stopCurrentMasterNode(); - masterNode = startMasterNode(); - if (restartRedNode) { - redNode = startDataNode("red", redNodeDataPath); - } - - ensureGreen("red_index"); - assertIndexNotInMetaState(blueNode, "red_index"); - assertIndexInMetaState(redNode, "red_index"); - assertIndexInMetaState(masterNode, "red_index"); - clusterStateResponse = client().admin().cluster().prepareState().get(); - assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.CLOSE.name())); - - // open the index again - client().admin().indices().prepareOpen("red_index").get(); - clusterStateResponse = client().admin().cluster().prepareState().get(); - assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.OPEN.name())); - // restart again - ensureGreen(); - if (restartRedNode) { - stopNode(redNode); - } - ((InternalTestCluster) cluster()).stopCurrentMasterNode(); - masterNode = startMasterNode(); - if (restartRedNode) { - redNode = startDataNode("red", redNodeDataPath); - } - ensureGreen("red_index"); - assertIndexNotInMetaState(blueNode, "red_index"); - assertIndexInMetaState(redNode, "red_index"); - assertIndexInMetaState(masterNode, "red_index"); - clusterStateResponse = client().admin().cluster().prepareState().get(); - assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.OPEN.name())); - assertTrue(client().prepareGet("red_index", "doc", "1").get().isExists()); - } - @Test - public void testMetaWrittenWhenIndexIsClosedAndMetaUpdated() throws Exception { - String masterNode = startMasterNode(); - String redNodeDataPath = createTempDir().toString(); - String redNode = startDataNode("red", redNodeDataPath); - // create red_index on red_node and same for red - client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("2")).get(); - assertAcked(prepareCreate("red_index").setSettings(ImmutableSettings.builder().put("index.number_of_replicas", 0).put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red"))); - index("red_index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject()); - - logger.info("--> wait for green red_index"); - ensureGreen(); - logger.info("--> wait for meta state written for red_index"); - assertIndexInMetaState(redNode, "red_index"); - assertIndexInMetaState(masterNode, "red_index"); - - waitForConcreteMappingsOnAll("red_index", "doc", "text"); - - logger.info("--> close red_index"); - client().admin().indices().prepareClose("red_index").get(); - // close the index - ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get(); - assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.CLOSE.name())); - - logger.info("--> restart red node"); - stopNode(redNode); - redNode = startDataNode("red", redNodeDataPath); - client().admin().indices().preparePutMapping("red_index").setType("doc").setSource(jsonBuilder().startObject() - .startObject("properties") - .startObject("integer_field") - .field("type", "integer") - .endObject() - .endObject() - .endObject()).get(); - - GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings("red_index").addTypes("doc").get(); - assertNotNull(((LinkedHashMap)(getMappingsResponse.getMappings().get("red_index").get("doc").getSourceAsMap().get("properties"))).get("integer_field")); - // restart master with empty data folder and maybe red node - ((InternalTestCluster) cluster()).stopCurrentMasterNode(); - masterNode = startMasterNode(); - - ensureGreen("red_index"); - assertIndexInMetaState(redNode, "red_index"); - assertIndexInMetaState(masterNode, "red_index"); - clusterStateResponse = client().admin().cluster().prepareState().get(); - assertThat(clusterStateResponse.getState().getMetaData().index("red_index").getState().name(), equalTo(IndexMetaData.State.CLOSE.name())); - getMappingsResponse = client().admin().indices().prepareGetMappings("red_index").addTypes("doc").get(); - assertNotNull(((LinkedHashMap)(getMappingsResponse.getMappings().get("red_index").get("doc").getSourceAsMap().get("properties"))).get("integer_field")); - - } - - private String startDataNode(String color) { - return startDataNode(color, createTempDir().toString()); - } - - private String startDataNode(String color, String newDataPath) { - ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder() - .put("node.data", true) - .put("node.master", false) - .put("node.color", color) - .put("path.data", newDataPath); - return internalCluster().startNode(settingsBuilder.build()); - } - - private String startMasterNode() { - ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder() - .put("node.data", false) - .put("node.master", true) - .put("path.data", createTempDir().toString()); - return internalCluster().startNode(settingsBuilder.build()); - } - - private void stopNode(String name) throws IOException { - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(name)); - } - - protected void assertIndexNotInMetaState(String nodeName, String indexName) throws Exception { - assertMetaState(nodeName, indexName, false); - } - - protected void assertIndexInMetaState(String nodeName, String indexName) throws Exception { - assertMetaState(nodeName, indexName, true); - } - - private void assertMetaState(final String nodeName, final String indexName, final boolean shouldBe) throws Exception { - awaitBusy(new Predicate() { - @Override - public boolean apply(Object o) { - logger.info("checking if meta state exists..."); - return shouldBe == metaStateExists(nodeName, indexName); - } - }); - boolean inMetaSate = metaStateExists(nodeName, indexName); - if (shouldBe) { - assertTrue("expected " + indexName + " in meta state of node " + nodeName, inMetaSate); - } else { - assertFalse("expected " + indexName + " to not be in meta state of node " + nodeName, inMetaSate); - } - } - - private boolean metaStateExists(String nodeName, String indexName) { - GatewayMetaState redNodeMetaState = ((InternalTestCluster) cluster()).getInstance(GatewayMetaState.class, nodeName); - MetaData redNodeMetaData = null; - try { - redNodeMetaData = redNodeMetaState.loadMetaState(); - } catch (Exception e) { - fail("failed to load meta state"); - } - ImmutableOpenMap indices = redNodeMetaData.getIndices(); - boolean inMetaSate = false; - for (ObjectObjectCursor index : indices) { - inMetaSate = inMetaSate || index.key.equals(indexName); - } - return inMetaSate; - } -}