863 lines
38 KiB
Java
Raw Normal View History

Discovery: Add a dedicate queue for incoming ClusterStates The initial implementation of two phase commit based cluster state publishing (#13062) relied on a single in memory "pending" cluster state that is only processed by ZenDiscovery once committed by the master. While this is fine on it's own, it resulted in an issue with acknowledged APIs, such as the open index API, in the extreme case where a node falls behind and receives a commit message after a new cluster state has been published. Specifically: 1) Master receives and acked-API call and publishes cluster state CS1 2) Master waits for a min-master nodes to receives CS1 and commits it. 3) All nodes that have responded to CS1 are sent a commit message, however, node N didn't respond yet 4) Master waits for publish timeout (defaults to 30s) for all nodes to process the commit. Node N fails to do so. 5) Master publishes a cluster state CS2. Node N responds to cluster state CS1's publishing but receives cluster state CS2 before the commit for CS1 arrives. 6) The commit message for cluster CS1 is processed on node N, but fails because CS2 is pending. This caused the acked API in step 1 to return (but CS2 , is not yet processed). In this case, the action indicated by CS1 is not yet executed on node N and therefore the acked API calls return pre-maturely. Note that once CS2 is processed but the change in CS1 takes effect (cluster state operations are safe to batch and we do so all the time). An example failure can be found on: http://build-us-00.elastic.co/job/es_feature_two_phase_pub/314/ This commit extracts the already existing pending cluster state queue (processNewClusterStates) from ZenDiscovery into it's own class, which serves as a temporary container for in-flight cluster states. Once committed the cluster states are transferred to ZenDiscovery as they used to before. This allows "lagging" cluster states to still be successfully committed and processed (and likely to be ignored as a newer cluster state has already been processed). As a side effect, all batching logic is now extracted from ZenDiscovery and is unit tested.
2015-09-01 15:39:00 +02:00
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.CoordinationMetadata;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
Discovery: Add a dedicate queue for incoming ClusterStates The initial implementation of two phase commit based cluster state publishing (#13062) relied on a single in memory "pending" cluster state that is only processed by ZenDiscovery once committed by the master. While this is fine on it's own, it resulted in an issue with acknowledged APIs, such as the open index API, in the extreme case where a node falls behind and receives a commit message after a new cluster state has been published. Specifically: 1) Master receives and acked-API call and publishes cluster state CS1 2) Master waits for a min-master nodes to receives CS1 and commits it. 3) All nodes that have responded to CS1 are sent a commit message, however, node N didn't respond yet 4) Master waits for publish timeout (defaults to 30s) for all nodes to process the commit. Node N fails to do so. 5) Master publishes a cluster state CS2. Node N responds to cluster state CS1's publishing but receives cluster state CS2 before the commit for CS1 arrives. 6) The commit message for cluster CS1 is processed on node N, but fails because CS2 is pending. This caused the acked API in step 1 to return (but CS2 , is not yet processed). In this case, the action indicated by CS1 is not yet executed on node N and therefore the acked API calls return pre-maturely. Note that once CS2 is processed but the change in CS1 takes effect (cluster state operations are safe to batch and we do so all the time). An example failure can be found on: http://build-us-00.elastic.co/job/es_feature_two_phase_pub/314/ This commit extracts the already existing pending cluster state queue (processNewClusterStates) from ZenDiscovery into it's own class, which serves as a temporary container for in-flight cluster states. Once committed the cluster states are transferred to ZenDiscovery as they used to before. This allows "lagging" cluster states to still be successfully committed and processed (and likely to be ignored as a newer cluster state has already been processed). As a side effect, all batching logic is now extracted from ZenDiscovery and is unit tested.
2015-09-01 15:39:00 +02:00
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
Discovery: Add a dedicate queue for incoming ClusterStates The initial implementation of two phase commit based cluster state publishing (#13062) relied on a single in memory "pending" cluster state that is only processed by ZenDiscovery once committed by the master. While this is fine on it's own, it resulted in an issue with acknowledged APIs, such as the open index API, in the extreme case where a node falls behind and receives a commit message after a new cluster state has been published. Specifically: 1) Master receives and acked-API call and publishes cluster state CS1 2) Master waits for a min-master nodes to receives CS1 and commits it. 3) All nodes that have responded to CS1 are sent a commit message, however, node N didn't respond yet 4) Master waits for publish timeout (defaults to 30s) for all nodes to process the commit. Node N fails to do so. 5) Master publishes a cluster state CS2. Node N responds to cluster state CS1's publishing but receives cluster state CS2 before the commit for CS1 arrives. 6) The commit message for cluster CS1 is processed on node N, but fails because CS2 is pending. This caused the acked API in step 1 to return (but CS2 , is not yet processed). In this case, the action indicated by CS1 is not yet executed on node N and therefore the acked API calls return pre-maturely. Note that once CS2 is processed but the change in CS1 takes effect (cluster state operations are safe to batch and we do so all the time). An example failure can be found on: http://build-us-00.elastic.co/job/es_feature_two_phase_pub/314/ This commit extracts the already existing pending cluster state queue (processNewClusterStates) from ZenDiscovery into it's own class, which serves as a temporary container for in-flight cluster states. Once committed the cluster states are transferred to ZenDiscovery as they used to before. This allows "lagging" cluster states to still be successfully committed and processed (and likely to be ignored as a newer cluster state has already been processed). As a side effect, all batching logic is now extracted from ZenDiscovery and is unit tested.
2015-09-01 15:39:00 +02:00
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TestCustomMetadata;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Discovery: Add a dedicate queue for incoming ClusterStates The initial implementation of two phase commit based cluster state publishing (#13062) relied on a single in memory "pending" cluster state that is only processed by ZenDiscovery once committed by the master. While this is fine on it's own, it resulted in an issue with acknowledged APIs, such as the open index API, in the extreme case where a node falls behind and receives a commit message after a new cluster state has been published. Specifically: 1) Master receives and acked-API call and publishes cluster state CS1 2) Master waits for a min-master nodes to receives CS1 and commits it. 3) All nodes that have responded to CS1 are sent a commit message, however, node N didn't respond yet 4) Master waits for publish timeout (defaults to 30s) for all nodes to process the commit. Node N fails to do so. 5) Master publishes a cluster state CS2. Node N responds to cluster state CS1's publishing but receives cluster state CS2 before the commit for CS1 arrives. 6) The commit message for cluster CS1 is processed on node N, but fails because CS2 is pending. This caused the acked API in step 1 to return (but CS2 , is not yet processed). In this case, the action indicated by CS1 is not yet executed on node N and therefore the acked API calls return pre-maturely. Note that once CS2 is processed but the change in CS1 takes effect (cluster state operations are safe to batch and we do so all the time). An example failure can be found on: http://build-us-00.elastic.co/job/es_feature_two_phase_pub/314/ This commit extracts the already existing pending cluster state queue (processNewClusterStates) from ZenDiscovery into it's own class, which serves as a temporary container for in-flight cluster states. Once committed the cluster states are transferred to ZenDiscovery as they used to before. This allows "lagging" cluster states to still be successfully committed and processed (and likely to be ignored as a newer cluster state has already been processed). As a side effect, all batching logic is now extracted from ZenDiscovery and is unit tested.
2015-09-01 15:39:00 +02:00
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
import static org.hamcrest.Matchers.containsString;
Discovery: Add a dedicate queue for incoming ClusterStates The initial implementation of two phase commit based cluster state publishing (#13062) relied on a single in memory "pending" cluster state that is only processed by ZenDiscovery once committed by the master. While this is fine on it's own, it resulted in an issue with acknowledged APIs, such as the open index API, in the extreme case where a node falls behind and receives a commit message after a new cluster state has been published. Specifically: 1) Master receives and acked-API call and publishes cluster state CS1 2) Master waits for a min-master nodes to receives CS1 and commits it. 3) All nodes that have responded to CS1 are sent a commit message, however, node N didn't respond yet 4) Master waits for publish timeout (defaults to 30s) for all nodes to process the commit. Node N fails to do so. 5) Master publishes a cluster state CS2. Node N responds to cluster state CS1's publishing but receives cluster state CS2 before the commit for CS1 arrives. 6) The commit message for cluster CS1 is processed on node N, but fails because CS2 is pending. This caused the acked API in step 1 to return (but CS2 , is not yet processed). In this case, the action indicated by CS1 is not yet executed on node N and therefore the acked API calls return pre-maturely. Note that once CS2 is processed but the change in CS1 takes effect (cluster state operations are safe to batch and we do so all the time). An example failure can be found on: http://build-us-00.elastic.co/job/es_feature_two_phase_pub/314/ This commit extracts the already existing pending cluster state queue (processNewClusterStates) from ZenDiscovery into it's own class, which serves as a temporary container for in-flight cluster states. Once committed the cluster states are transferred to ZenDiscovery as they used to before. This allows "lagging" cluster states to still be successfully committed and processed (and likely to be ignored as a newer cluster state has already been processed). As a side effect, all batching logic is now extracted from ZenDiscovery and is unit tested.
2015-09-01 15:39:00 +02:00
import static org.hamcrest.Matchers.equalTo;
public class ClusterStateTests extends ESTestCase {
public void testSupersedes() {
Persistent Node Ids (#19140) Node IDs are currently randomly generated during node startup. That means they change every time the node is restarted. While this doesn't matter for ES proper, it makes it hard for external services to track nodes. Another, more minor, side effect is that indexing the output of, say, the node stats API results in creating new fields due to node ID being used as keys. The first approach I considered was to use the node's published address as the base for the id. We already [treat nodes with the same address as the same](https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java#L387) so this is a simple change (see [here](https://github.com/elastic/elasticsearch/compare/master...bleskes:node_persistent_id_based_on_address)). While this is simple and it works for probably most cases, it is not perfect. For example, if after a node restart, the node is not able to bind to the same port (because it's not yet freed by the OS), it will cause the node to still change identity. Also in environments where the host IP can change due to a host restart, identity will not be the same. Due to those limitation, I opted to go with a different approach where the node id will be persisted in the node's data folder. This has the upside of connecting the id to the nodes data. It also means that the host can be adapted in any way (replace network cards, attach storage to a new VM). I It does however also have downsides - we now run the risk of two nodes having the same id, if someone copies clones a data folder from one node to another. To mitigate this I changed the semantics of the protection against multiple nodes with the same address to be stricter - it will now reject the incoming join if a node exists with the same id but a different address. Note that if the existing node doesn't respond to pings (i.e., it's not alive) it will be removed and the new node will be accepted when it tries another join. Last, and most importantly, this change requires that *all* nodes persist data to disk. This is a change from current behavior where only data & master nodes store local files. This is the main reason for marking this PR as breaking. Other less important notes: - DummyTransportAddress is removed as we need a unique network address per node. Use `LocalTransportAddress.buildUnique()` instead. - I renamed `node.add_lid_to_custom_path` to `node.add_lock_id_to_custom_path` to avoid confusion with the node ID which is now part of the `NodeEnvironment` logic. - I removed the `version` paramater from `MetaDataStateFormat#write` , it wasn't really used and was just in the way :) - TribeNodes are special in the sense that they do start multiple sub-nodes (previously known as client nodes). Those sub-nodes do not store local files but derive their ID from the parent node id, so they are generated consistently.
2016-07-04 21:09:25 +02:00
final Version version = Version.CURRENT;
final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), version);
final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), version);
Upon being elected as master, prefer joins' node info to existing cluster state (#19743) When we introduces [persistent node ids](https://github.com/elastic/elasticsearch/pull/19140) we were concerned that people may copy data folders from one to another resulting in two nodes competing for the same id in the cluster. To solve this we elected to not allow an incoming join if a different with same id already exists in the cluster, or if some other node already has the same transport address as the incoming join. The rationeel there was that it is better to prefer existing nodes and that we can rely on node fault detection to remove any node from the cluster that isn't correct any more, making room for the node that wants to join (and will keep trying). Sadly there were two problems with this: 1) One minor and easy to fix - we didn't allow for the case where the existing node can have the same network address as the incoming one, but have a different ephemeral id (after node restart). This confused the logic in `AllocationService`, in this rare cases. The cluster is good enough to detect this and recover later on, but it's not clean. 2) The assumption that Node Fault Detection will clean up is *wrong* when the node just won an election (it wasn't master before) and needs to process the incoming joins in order to commit the cluster state and assume it's mastership. In those cases, the Node Fault Detection isn't active. This PR fixes these two and prefers incoming nodes to existing node when finishing an election. On top of the, on request by @ywelsch , `AllocationService` synchronization between the nodes of the cluster and it's routing table is now explicit rather than something we do all the time. The same goes for promotion of replicas to primaries.
2016-08-05 08:58:03 +02:00
final DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).build();
ClusterName name = ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY);
ClusterState noMaster1 = ClusterState.builder(name).version(randomInt(5)).nodes(nodes).build();
ClusterState noMaster2 = ClusterState.builder(name).version(randomInt(5)).nodes(nodes).build();
ClusterState withMaster1a = ClusterState.builder(name).version(randomInt(5)).nodes(DiscoveryNodes.builder(nodes)
.masterNodeId(node1.getId())).build();
ClusterState withMaster1b = ClusterState.builder(name).version(randomInt(5)).nodes(DiscoveryNodes.builder(nodes)
.masterNodeId(node1.getId())).build();
ClusterState withMaster2 = ClusterState.builder(name).version(randomInt(5)).nodes(DiscoveryNodes.builder(nodes)
.masterNodeId(node2.getId())).build();
Discovery: Add a dedicate queue for incoming ClusterStates The initial implementation of two phase commit based cluster state publishing (#13062) relied on a single in memory "pending" cluster state that is only processed by ZenDiscovery once committed by the master. While this is fine on it's own, it resulted in an issue with acknowledged APIs, such as the open index API, in the extreme case where a node falls behind and receives a commit message after a new cluster state has been published. Specifically: 1) Master receives and acked-API call and publishes cluster state CS1 2) Master waits for a min-master nodes to receives CS1 and commits it. 3) All nodes that have responded to CS1 are sent a commit message, however, node N didn't respond yet 4) Master waits for publish timeout (defaults to 30s) for all nodes to process the commit. Node N fails to do so. 5) Master publishes a cluster state CS2. Node N responds to cluster state CS1's publishing but receives cluster state CS2 before the commit for CS1 arrives. 6) The commit message for cluster CS1 is processed on node N, but fails because CS2 is pending. This caused the acked API in step 1 to return (but CS2 , is not yet processed). In this case, the action indicated by CS1 is not yet executed on node N and therefore the acked API calls return pre-maturely. Note that once CS2 is processed but the change in CS1 takes effect (cluster state operations are safe to batch and we do so all the time). An example failure can be found on: http://build-us-00.elastic.co/job/es_feature_two_phase_pub/314/ This commit extracts the already existing pending cluster state queue (processNewClusterStates) from ZenDiscovery into it's own class, which serves as a temporary container for in-flight cluster states. Once committed the cluster states are transferred to ZenDiscovery as they used to before. This allows "lagging" cluster states to still be successfully committed and processed (and likely to be ignored as a newer cluster state has already been processed). As a side effect, all batching logic is now extracted from ZenDiscovery and is unit tested.
2015-09-01 15:39:00 +02:00
// states with no master should never supersede anything
assertFalse(noMaster1.supersedes(noMaster2));
assertFalse(noMaster1.supersedes(withMaster1a));
// states should never supersede states from another master
assertFalse(withMaster1a.supersedes(withMaster2));
assertFalse(withMaster1a.supersedes(noMaster1));
// state from the same master compare by version
assertThat(withMaster1a.supersedes(withMaster1b), equalTo(withMaster1a.version() > withMaster1b.version()));
}
public void testBuilderRejectsNullCustom() {
final ClusterState.Builder builder = ClusterState.builder(ClusterName.DEFAULT);
final String key = randomAlphaOfLength(10);
assertThat(expectThrows(NullPointerException.class, () -> builder.putCustom(key, null)).getMessage(), containsString(key));
}
public void testBuilderRejectsNullInCustoms() {
final ClusterState.Builder builder = ClusterState.builder(ClusterName.DEFAULT);
final String key = randomAlphaOfLength(10);
final ImmutableOpenMap.Builder<String, ClusterState.Custom> mapBuilder = ImmutableOpenMap.builder();
mapBuilder.put(key, null);
final ImmutableOpenMap<String, ClusterState.Custom> map = mapBuilder.build();
assertThat(expectThrows(NullPointerException.class, () -> builder.customs(map)).getMessage(), containsString(key));
}
public void testToXContent() throws IOException {
final ClusterState clusterState = buildClusterState();
IndexRoutingTable index = clusterState.getRoutingTable().getIndicesRouting().get("index");
String ephemeralId = clusterState.getNodes().get("nodeId1").getEphemeralId();
String allocationId = index.getShards().get(1).getAllAllocationIds().iterator().next();
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
builder.startObject();
clusterState.toXContent(builder, new ToXContent.MapParams(singletonMap(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API)));
builder.endObject();
assertEquals("{\n" +
" \"cluster_uuid\" : \"clusterUUID\",\n" +
" \"version\" : 0,\n" +
" \"state_uuid\" : \"stateUUID\",\n" +
" \"master_node\" : \"masterNodeId\",\n" +
" \"blocks\" : {\n" +
" \"global\" : {\n" +
" \"1\" : {\n" +
" \"description\" : \"description\",\n" +
" \"retryable\" : true,\n" +
" \"disable_state_persistence\" : true,\n" +
" \"levels\" : [\n" +
" \"read\",\n" +
" \"write\",\n" +
" \"metadata_read\",\n" +
" \"metadata_write\"\n" +
" ]\n" +
" }\n" +
" },\n" +
" \"indices\" : {\n" +
" \"index\" : {\n" +
" \"2\" : {\n" +
" \"description\" : \"description2\",\n" +
" \"retryable\" : false,\n" +
" \"levels\" : [\n" +
" \"read\",\n" +
" \"write\",\n" +
" \"metadata_read\",\n" +
" \"metadata_write\"\n" +
" ]\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
" \"nodes\" : {\n" +
" \"nodeId1\" : {\n" +
" \"name\" : \"\",\n" +
" \"ephemeral_id\" : \"" + ephemeralId + "\",\n" +
" \"transport_address\" : \"127.0.0.1:111\",\n" +
" \"attributes\" : { }\n" +
" }\n" +
" },\n" +
" \"metadata\" : {\n" +
" \"cluster_uuid\" : \"clusterUUID\",\n" +
" \"cluster_uuid_committed\" : false,\n" +
" \"cluster_coordination\" : {\n" +
" \"term\" : 1,\n" +
" \"last_committed_config\" : [\n" +
" \"commitedConfigurationNodeId\"\n" +
" ],\n" +
" \"last_accepted_config\" : [\n" +
" \"acceptedConfigurationNodeId\"\n" +
" ],\n" +
" \"voting_config_exclusions\" : [\n" +
" {\n" +
" \"node_id\" : \"exlucdedNodeId\",\n" +
" \"node_name\" : \"excludedNodeName\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"templates\" : {\n" +
" \"template\" : {\n" +
" \"order\" : 0,\n" +
" \"index_patterns\" : [\n" +
" \"pattern1\",\n" +
" \"pattern2\"\n" +
" ],\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"version\" : {\n" +
" \"created\" : \"" + Version.CURRENT.id + "\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"type\" : {\n" +
" \"key1\" : { }\n" +
" }\n" +
" },\n" +
" \"aliases\" : { }\n" +
" }\n" +
" },\n" +
" \"indices\" : {\n" +
" \"index\" : {\n" +
" \"version\" : 1,\n" +
" \"mapping_version\" : 1,\n" +
" \"settings_version\" : 1,\n" +
" \"aliases_version\" : 1,\n" +
" \"routing_num_shards\" : 1,\n" +
" \"state\" : \"open\",\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : \"1\",\n" +
" \"number_of_replicas\" : \"2\",\n" +
" \"version\" : {\n" +
" \"created\" : \"" + Version.CURRENT.id + "\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"type\" : {\n" +
" \"type1\" : {\n" +
" \"key\" : \"value\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"aliases\" : [\n" +
" \"alias\"\n" +
" ],\n" +
" \"primary_terms\" : {\n" +
" \"0\" : 1\n" +
" },\n" +
" \"in_sync_allocations\" : {\n" +
" \"0\" : [\n" +
" \"allocationId\"\n" +
" ]\n" +
" },\n" +
" \"rollover_info\" : {\n" +
" \"rolloveAlias\" : {\n" +
" \"met_conditions\" : { },\n" +
" \"time\" : 1\n" +
" }\n" +
" },\n" +
" \"system\" : false\n" +
" }\n" +
" },\n" +
" \"index-graveyard\" : {\n" +
" \"tombstones\" : [ ]\n" +
" }\n" +
" },\n" +
" \"routing_table\" : {\n" +
" \"indices\" : {\n" +
" \"index\" : {\n" +
" \"shards\" : {\n" +
" \"1\" : [\n" +
" {\n" +
" \"state\" : \"STARTED\",\n" +
" \"primary\" : true,\n" +
" \"node\" : \"nodeId2\",\n" +
" \"relocating_node\" : null,\n" +
" \"shard\" : 1,\n" +
" \"index\" : \"index\",\n" +
" \"allocation_id\" : {\n" +
" \"id\" : \"" + allocationId + "\"\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
" \"routing_nodes\" : {\n" +
" \"unassigned\" : [ ],\n" +
" \"nodes\" : {\n" +
" \"nodeId2\" : [\n" +
" {\n" +
" \"state\" : \"STARTED\",\n" +
" \"primary\" : true,\n" +
" \"node\" : \"nodeId2\",\n" +
" \"relocating_node\" : null,\n" +
" \"shard\" : 1,\n" +
" \"index\" : \"index\",\n" +
" \"allocation_id\" : {\n" +
" \"id\" : \"" + allocationId + "\"\n" +
" }\n" +
" }\n" +
" ],\n" +
" \"nodeId1\" : [ ]\n" +
" }\n" +
" }\n" +
"}", Strings.toString(builder));
}
public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOException {
Map<String, String> mapParams = new HashMap<String,String>(){{
put("flat_settings", "true");
put("reduce_mappings", "false");
put(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API);
}};
final ClusterState clusterState = buildClusterState();
IndexRoutingTable index = clusterState.getRoutingTable().getIndicesRouting().get("index");
String ephemeralId = clusterState.getNodes().get("nodeId1").getEphemeralId();
String allocationId = index.getShards().get(1).getAllAllocationIds().iterator().next();
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
builder.startObject();
clusterState.toXContent(builder, new ToXContent.MapParams(mapParams));
builder.endObject();
assertEquals("{\n" +
" \"cluster_uuid\" : \"clusterUUID\",\n" +
" \"version\" : 0,\n" +
" \"state_uuid\" : \"stateUUID\",\n" +
" \"master_node\" : \"masterNodeId\",\n" +
" \"blocks\" : {\n" +
" \"global\" : {\n" +
" \"1\" : {\n" +
" \"description\" : \"description\",\n" +
" \"retryable\" : true,\n" +
" \"disable_state_persistence\" : true,\n" +
" \"levels\" : [\n" +
" \"read\",\n" +
" \"write\",\n" +
" \"metadata_read\",\n" +
" \"metadata_write\"\n" +
" ]\n" +
" }\n" +
" },\n" +
" \"indices\" : {\n" +
" \"index\" : {\n" +
" \"2\" : {\n" +
" \"description\" : \"description2\",\n" +
" \"retryable\" : false,\n" +
" \"levels\" : [\n" +
" \"read\",\n" +
" \"write\",\n" +
" \"metadata_read\",\n" +
" \"metadata_write\"\n" +
" ]\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
" \"nodes\" : {\n" +
" \"nodeId1\" : {\n" +
" \"name\" : \"\",\n" +
" \"ephemeral_id\" : \"" + ephemeralId + "\",\n" +
" \"transport_address\" : \"127.0.0.1:111\",\n" +
" \"attributes\" : { }\n" +
" }\n" +
" },\n" +
" \"metadata\" : {\n" +
" \"cluster_uuid\" : \"clusterUUID\",\n" +
" \"cluster_uuid_committed\" : false,\n" +
" \"cluster_coordination\" : {\n" +
" \"term\" : 1,\n" +
" \"last_committed_config\" : [\n" +
" \"commitedConfigurationNodeId\"\n" +
" ],\n" +
" \"last_accepted_config\" : [\n" +
" \"acceptedConfigurationNodeId\"\n" +
" ],\n" +
" \"voting_config_exclusions\" : [\n" +
" {\n" +
" \"node_id\" : \"exlucdedNodeId\",\n" +
" \"node_name\" : \"excludedNodeName\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"templates\" : {\n" +
" \"template\" : {\n" +
" \"order\" : 0,\n" +
" \"index_patterns\" : [\n" +
" \"pattern1\",\n" +
" \"pattern2\"\n" +
" ],\n" +
" \"settings\" : {\n" +
" \"index.version.created\" : \"" + Version.CURRENT.id + "\"\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"type\" : {\n" +
" \"key1\" : { }\n" +
" }\n" +
" },\n" +
" \"aliases\" : { }\n" +
" }\n" +
" },\n" +
" \"indices\" : {\n" +
" \"index\" : {\n" +
" \"version\" : 1,\n" +
" \"mapping_version\" : 1,\n" +
" \"settings_version\" : 1,\n" +
" \"aliases_version\" : 1,\n" +
" \"routing_num_shards\" : 1,\n" +
" \"state\" : \"open\",\n" +
" \"settings\" : {\n" +
" \"index.number_of_replicas\" : \"2\",\n" +
" \"index.number_of_shards\" : \"1\",\n" +
" \"index.version.created\" : \"" + Version.CURRENT.id + "\"\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"type\" : {\n" +
" \"type1\" : {\n" +
" \"key\" : \"value\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"aliases\" : [\n" +
" \"alias\"\n" +
" ],\n" +
" \"primary_terms\" : {\n" +
" \"0\" : 1\n" +
" },\n" +
" \"in_sync_allocations\" : {\n" +
" \"0\" : [\n" +
" \"allocationId\"\n" +
" ]\n" +
" },\n" +
" \"rollover_info\" : {\n" +
" \"rolloveAlias\" : {\n" +
" \"met_conditions\" : { },\n" +
" \"time\" : 1\n" +
" }\n" +
" },\n" +
" \"system\" : false\n" +
" }\n" +
" },\n" +
" \"index-graveyard\" : {\n" +
" \"tombstones\" : [ ]\n" +
" }\n" +
" },\n" +
" \"routing_table\" : {\n" +
" \"indices\" : {\n" +
" \"index\" : {\n" +
" \"shards\" : {\n" +
" \"1\" : [\n" +
" {\n" +
" \"state\" : \"STARTED\",\n" +
" \"primary\" : true,\n" +
" \"node\" : \"nodeId2\",\n" +
" \"relocating_node\" : null,\n" +
" \"shard\" : 1,\n" +
" \"index\" : \"index\",\n" +
" \"allocation_id\" : {\n" +
" \"id\" : \"" + allocationId + "\"\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
" \"routing_nodes\" : {\n" +
" \"unassigned\" : [ ],\n" +
" \"nodes\" : {\n" +
" \"nodeId2\" : [\n" +
" {\n" +
" \"state\" : \"STARTED\",\n" +
" \"primary\" : true,\n" +
" \"node\" : \"nodeId2\",\n" +
" \"relocating_node\" : null,\n" +
" \"shard\" : 1,\n" +
" \"index\" : \"index\",\n" +
" \"allocation_id\" : {\n" +
" \"id\" : \"" + allocationId + "\"\n" +
" }\n" +
" }\n" +
" ],\n" +
" \"nodeId1\" : [ ]\n" +
" }\n" +
" }\n" +
"}", Strings.toString(builder));
}
public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOException {
Map<String, String> mapParams = new HashMap<String,String>(){{
put("flat_settings", "false");
put("reduce_mappings", "true");
put(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API);
}};
final ClusterState clusterState = buildClusterState();
IndexRoutingTable index = clusterState.getRoutingTable().getIndicesRouting().get("index");
String ephemeralId = clusterState.getNodes().get("nodeId1").getEphemeralId();
String allocationId = index.getShards().get(1).getAllAllocationIds().iterator().next();
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
builder.startObject();
clusterState.toXContent(builder, new ToXContent.MapParams(mapParams));
builder.endObject();
assertEquals("{\n" +
" \"cluster_uuid\" : \"clusterUUID\",\n" +
" \"version\" : 0,\n" +
" \"state_uuid\" : \"stateUUID\",\n" +
" \"master_node\" : \"masterNodeId\",\n" +
" \"blocks\" : {\n" +
" \"global\" : {\n" +
" \"1\" : {\n" +
" \"description\" : \"description\",\n" +
" \"retryable\" : true,\n" +
" \"disable_state_persistence\" : true,\n" +
" \"levels\" : [\n" +
" \"read\",\n" +
" \"write\",\n" +
" \"metadata_read\",\n" +
" \"metadata_write\"\n" +
" ]\n" +
" }\n" +
" },\n" +
" \"indices\" : {\n" +
" \"index\" : {\n" +
" \"2\" : {\n" +
" \"description\" : \"description2\",\n" +
" \"retryable\" : false,\n" +
" \"levels\" : [\n" +
" \"read\",\n" +
" \"write\",\n" +
" \"metadata_read\",\n" +
" \"metadata_write\"\n" +
" ]\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
" \"nodes\" : {\n" +
" \"nodeId1\" : {\n" +
" \"name\" : \"\",\n" +
" \"ephemeral_id\" : \"" + ephemeralId + "\",\n" +
" \"transport_address\" : \"127.0.0.1:111\",\n" +
" \"attributes\" : { }\n" +
" }\n" +
" },\n" +
" \"metadata\" : {\n" +
" \"cluster_uuid\" : \"clusterUUID\",\n" +
" \"cluster_uuid_committed\" : false,\n" +
" \"cluster_coordination\" : {\n" +
" \"term\" : 1,\n" +
" \"last_committed_config\" : [\n" +
" \"commitedConfigurationNodeId\"\n" +
" ],\n" +
" \"last_accepted_config\" : [\n" +
" \"acceptedConfigurationNodeId\"\n" +
" ],\n" +
" \"voting_config_exclusions\" : [\n" +
" {\n" +
" \"node_id\" : \"exlucdedNodeId\",\n" +
" \"node_name\" : \"excludedNodeName\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"templates\" : {\n" +
" \"template\" : {\n" +
" \"order\" : 0,\n" +
" \"index_patterns\" : [\n" +
" \"pattern1\",\n" +
" \"pattern2\"\n" +
" ],\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"version\" : {\n" +
" \"created\" : \"" + Version.CURRENT.id + "\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"type\" : {\n" +
" \"key1\" : { }\n" +
" }\n" +
" },\n" +
" \"aliases\" : { }\n" +
" }\n" +
" },\n" +
" \"indices\" : {\n" +
" \"index\" : {\n" +
" \"version\" : 1,\n" +
" \"mapping_version\" : 1,\n" +
" \"settings_version\" : 1,\n" +
" \"aliases_version\" : 1,\n" +
" \"routing_num_shards\" : 1,\n" +
" \"state\" : \"open\",\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : \"1\",\n" +
" \"number_of_replicas\" : \"2\",\n" +
" \"version\" : {\n" +
" \"created\" : \"" + Version.CURRENT.id + "\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"type\" : {\n" +
" \"type1\" : {\n" +
" \"key\" : \"value\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"aliases\" : [\n" +
" \"alias\"\n" +
" ],\n" +
" \"primary_terms\" : {\n" +
" \"0\" : 1\n" +
" },\n" +
" \"in_sync_allocations\" : {\n" +
" \"0\" : [\n" +
" \"allocationId\"\n" +
" ]\n" +
" },\n" +
" \"rollover_info\" : {\n" +
" \"rolloveAlias\" : {\n" +
" \"met_conditions\" : { },\n" +
" \"time\" : 1\n" +
" }\n" +
" },\n" +
" \"system\" : false\n" +
" }\n" +
" },\n" +
" \"index-graveyard\" : {\n" +
" \"tombstones\" : [ ]\n" +
" }\n" +
" },\n" +
" \"routing_table\" : {\n" +
" \"indices\" : {\n" +
" \"index\" : {\n" +
" \"shards\" : {\n" +
" \"1\" : [\n" +
" {\n" +
" \"state\" : \"STARTED\",\n" +
" \"primary\" : true,\n" +
" \"node\" : \"nodeId2\",\n" +
" \"relocating_node\" : null,\n" +
" \"shard\" : 1,\n" +
" \"index\" : \"index\",\n" +
" \"allocation_id\" : {\n" +
" \"id\" : \"" + allocationId + "\"\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
" \"routing_nodes\" : {\n" +
" \"unassigned\" : [ ],\n" +
" \"nodes\" : {\n" +
" \"nodeId2\" : [\n" +
" {\n" +
" \"state\" : \"STARTED\",\n" +
" \"primary\" : true,\n" +
" \"node\" : \"nodeId2\",\n" +
" \"relocating_node\" : null,\n" +
" \"shard\" : 1,\n" +
" \"index\" : \"index\",\n" +
" \"allocation_id\" : {\n" +
" \"id\" : \"" + allocationId + "\"\n" +
" }\n" +
" }\n" +
" ],\n" +
" \"nodeId1\" : [ ]\n" +
" }\n" +
" }\n" +
"}", Strings.toString(builder));
}
public void testToXContentSameTypeName() throws IOException {
final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.stateUUID("stateUUID")
.metadata(Metadata.builder()
.clusterUUID("clusterUUID")
.coordinationMetadata(CoordinationMetadata.builder()
.build())
.put(IndexMetadata.builder("index")
.state(IndexMetadata.State.OPEN)
.settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT.id))
.putMapping(new MappingMetadata("type",
// the type name is the root value,
// the original logic in ClusterState.toXContent will reduce
new HashMap<String,Object>(){{
put("type", new HashMap<String, Object>(){{
put("key", "value");
}});
}}))
.numberOfShards(1)
.primaryTerm(0, 1L)
.numberOfReplicas(2)))
.build();
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
builder.startObject();
clusterState.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
assertEquals("{\n" +
" \"cluster_uuid\" : \"clusterUUID\",\n" +
" \"version\" : 0,\n" +
" \"state_uuid\" : \"stateUUID\",\n" +
" \"master_node\" : null,\n" +
" \"blocks\" : { },\n" +
" \"nodes\" : { },\n" +
" \"metadata\" : {\n" +
" \"cluster_uuid\" : \"clusterUUID\",\n" +
" \"cluster_uuid_committed\" : false,\n" +
" \"cluster_coordination\" : {\n" +
" \"term\" : 0,\n" +
" \"last_committed_config\" : [ ],\n" +
" \"last_accepted_config\" : [ ],\n" +
" \"voting_config_exclusions\" : [ ]\n" +
" },\n" +
" \"templates\" : { },\n" +
" \"indices\" : {\n" +
" \"index\" : {\n" +
" \"version\" : 2,\n" +
" \"mapping_version\" : 1,\n" +
" \"settings_version\" : 1,\n" +
" \"aliases_version\" : 1,\n" +
" \"routing_num_shards\" : 1,\n" +
" \"state\" : \"open\",\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : \"1\",\n" +
" \"number_of_replicas\" : \"2\",\n" +
" \"version\" : {\n" +
" \"created\" : \"" + Version.CURRENT.id + "\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"type\" : {\n" +
" \"key\" : \"value\"\n" +
" }\n" +
" },\n" +
" \"aliases\" : [ ],\n" +
" \"primary_terms\" : {\n" +
" \"0\" : 1\n" +
" },\n" +
" \"in_sync_allocations\" : {\n" +
" \"0\" : [ ]\n" +
" },\n" +
" \"rollover_info\" : { },\n" +
" \"system\" : false\n" +
" }\n" +
" },\n" +
" \"index-graveyard\" : {\n" +
" \"tombstones\" : [ ]\n" +
" }\n" +
" },\n" +
" \"routing_table\" : {\n" +
" \"indices\" : { }\n" +
" },\n" +
" \"routing_nodes\" : {\n" +
" \"unassigned\" : [ ],\n" +
" \"nodes\" : { }\n" +
" }\n" +
"}", Strings.toString(builder));
}
private ClusterState buildClusterState() throws IOException {
IndexMetadata indexMetadata = IndexMetadata.builder("index")
.state(IndexMetadata.State.OPEN)
.settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT.id))
.putMapping(new MappingMetadata("type",
new HashMap<String,Object>() {{
put("type1", new HashMap<String, Object>(){{
put("key", "value");
}});
}}))
.putAlias(AliasMetadata.builder("alias")
.indexRouting("indexRouting")
.build())
.numberOfShards(1)
.primaryTerm(0, 1L)
.putInSyncAllocationIds(0, new HashSet<String>() {{
add("allocationId");
}})
.numberOfReplicas(2)
.putRolloverInfo(new RolloverInfo("rolloveAlias", new ArrayList<>(), 1L))
.build();
return ClusterState.builder(ClusterName.DEFAULT)
.stateUUID("stateUUID")
.nodes(DiscoveryNodes.builder()
.masterNodeId("masterNodeId")
.add(new DiscoveryNode("nodeId1", new TransportAddress(InetAddress.getByName("127.0.0.1"), 111),
Version.CURRENT))
.build())
.blocks(ClusterBlocks.builder()
.addGlobalBlock(
new ClusterBlock(1, "description", true, true, true, RestStatus.ACCEPTED, EnumSet.allOf((ClusterBlockLevel.class))))
.addBlocks(indexMetadata)
.addIndexBlock("index",
new ClusterBlock(2, "description2", false, false, false, RestStatus.ACCEPTED, EnumSet.allOf((ClusterBlockLevel.class))))
.build())
.metadata(Metadata.builder()
.clusterUUID("clusterUUID")
.coordinationMetadata(CoordinationMetadata.builder()
.term(1)
.lastCommittedConfiguration(new CoordinationMetadata.VotingConfiguration(new HashSet<String>(){{
add("commitedConfigurationNodeId");
}}))
.lastAcceptedConfiguration(new CoordinationMetadata.VotingConfiguration(new HashSet<String>(){{
add("acceptedConfigurationNodeId");
}}))
.addVotingConfigExclusion(new CoordinationMetadata.VotingConfigExclusion("exlucdedNodeId", "excludedNodeName"))
.build())
.persistentSettings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT.id).build())
.transientSettings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT.id).build())
.put(indexMetadata, false)
.put(IndexTemplateMetadata.builder("template")
.patterns(Arrays.asList("pattern1", "pattern2"))
.order(0)
.settings(Settings.builder().put(SETTING_VERSION_CREATED, Version.CURRENT.id))
.putMapping("type", "{ \"key1\": {} }")
.build()))
.routingTable(RoutingTable.builder()
.add(IndexRoutingTable.builder(new Index("index", "indexUUID"))
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("index", "_na_", 1))
.addShard(TestShardRouting.newShardRouting(
new ShardId("index", "_na_", 1), "nodeId2", true, ShardRoutingState.STARTED))
.build())
.build())
.build())
.build();
}
public static class CustomMetadata extends TestCustomMetadata {
public static final String TYPE = "custom_md";
CustomMetadata(String data) {
super(data);
}
@Override
public String getWriteableName() {
return TYPE;
}
@Override
public Version getMinimalSupportedVersion() {
return Version.CURRENT;
}
@Override
public EnumSet<Metadata.XContentContext> context() {
return EnumSet.of(Metadata.XContentContext.GATEWAY, Metadata.XContentContext.SNAPSHOT);
}
}
Discovery: Add a dedicate queue for incoming ClusterStates The initial implementation of two phase commit based cluster state publishing (#13062) relied on a single in memory "pending" cluster state that is only processed by ZenDiscovery once committed by the master. While this is fine on it's own, it resulted in an issue with acknowledged APIs, such as the open index API, in the extreme case where a node falls behind and receives a commit message after a new cluster state has been published. Specifically: 1) Master receives and acked-API call and publishes cluster state CS1 2) Master waits for a min-master nodes to receives CS1 and commits it. 3) All nodes that have responded to CS1 are sent a commit message, however, node N didn't respond yet 4) Master waits for publish timeout (defaults to 30s) for all nodes to process the commit. Node N fails to do so. 5) Master publishes a cluster state CS2. Node N responds to cluster state CS1's publishing but receives cluster state CS2 before the commit for CS1 arrives. 6) The commit message for cluster CS1 is processed on node N, but fails because CS2 is pending. This caused the acked API in step 1 to return (but CS2 , is not yet processed). In this case, the action indicated by CS1 is not yet executed on node N and therefore the acked API calls return pre-maturely. Note that once CS2 is processed but the change in CS1 takes effect (cluster state operations are safe to batch and we do so all the time). An example failure can be found on: http://build-us-00.elastic.co/job/es_feature_two_phase_pub/314/ This commit extracts the already existing pending cluster state queue (processNewClusterStates) from ZenDiscovery into it's own class, which serves as a temporary container for in-flight cluster states. Once committed the cluster states are transferred to ZenDiscovery as they used to before. This allows "lagging" cluster states to still be successfully committed and processed (and likely to be ignored as a newer cluster state has already been processed). As a side effect, all batching logic is now extracted from ZenDiscovery and is unit tested.
2015-09-01 15:39:00 +02:00
}