From e4a6e99f6985c3fc3c5e6a7baa8040814957ad64 Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 18 Jan 2011 15:28:55 +0200 Subject: [PATCH] improve handling when not to persist current state to gateway based on blocks and have a block indicate if it also blocks state persistence --- .idea/dictionaries/kimchy.xml | 1 + .../cluster/block/ClusterBlock.java | 23 ++++++++++++++-- .../cluster/block/ClusterBlocks.java | 12 +++++++++ .../cluster/metadata/MetaData.java | 27 ++----------------- .../metadata/MetaDataStateIndexService.java | 2 +- .../elasticsearch/discovery/Discovery.java | 2 +- .../elasticsearch/gateway/GatewayService.java | 18 +++++-------- .../gateway/local/LocalGateway.java | 11 ++++---- .../gateway/shared/SharedStorageGateway.java | 8 +++--- 9 files changed, 54 insertions(+), 50 deletions(-) diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml index 41e84a0cb5a..fa6a87baca0 100644 --- a/.idea/dictionaries/kimchy.xml +++ b/.idea/dictionaries/kimchy.xml @@ -106,6 +106,7 @@ nospawn param params + persistency pinger pluggable plugins diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java index a89856cd5f4..12dde059c7c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java @@ -41,13 +41,16 @@ public class ClusterBlock implements Serializable, Streamable, ToXContent { private boolean retryable; - private ClusterBlock() { + private boolean disableStatePersistence = false; + + ClusterBlock() { } - public ClusterBlock(int id, String description, boolean retryable, ClusterBlockLevel... levels) { + public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence, ClusterBlockLevel... levels) { this.id = id; this.description = description; this.retryable = retryable; + this.disableStatePersistence = disableStatePersistence; this.levels = levels; } @@ -72,14 +75,28 @@ public class ClusterBlock implements Serializable, Streamable, ToXContent { return false; } + /** + * Should operations get into retry state if this block is present. + */ public boolean retryable() { return this.retryable; } + /** + * Should global state persistence be disabled when this block is present. Note, + * only relevant for global blocks. + */ + public boolean disableStatePersistence() { + return this.disableStatePersistence; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Integer.toString(id)); builder.field("description", description); builder.field("retryable", retryable); + if (disableStatePersistence) { + builder.field("disable_state_persistence", disableStatePersistence); + } builder.startArray("levels"); for (ClusterBlockLevel level : levels) { builder.value(level.name().toLowerCase()); @@ -103,6 +120,7 @@ public class ClusterBlock implements Serializable, Streamable, ToXContent { levels[i] = ClusterBlockLevel.fromId(in.readVInt()); } retryable = in.readBoolean(); + disableStatePersistence = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { @@ -113,6 +131,7 @@ public class ClusterBlock implements Serializable, Streamable, ToXContent { out.writeVInt(level.id()); } out.writeBoolean(retryable); + out.writeBoolean(disableStatePersistence); } public String toString() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java index 0eae69bd205..0dff3bb5871 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java @@ -91,6 +91,18 @@ public class ClusterBlocks { return levelHolders[level.id()].indices(); } + /** + * Returns true if one of the global blocks as its disable state persistence flag set. + */ + public boolean disableStatePersistence() { + for (ClusterBlock clusterBlock : global) { + if (clusterBlock.disableStatePersistence()) { + return true; + } + } + return false; + } + public boolean hasGlobalBlock(ClusterBlock block) { return global.contains(block); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 30548397d5d..2b59bcf773e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -46,7 +46,6 @@ public class MetaData implements Iterable { private final ImmutableMap templates; private final transient int totalNumberOfShards; - private final boolean recoveredFromGateway; private final String[] allIndices; @@ -55,10 +54,9 @@ public class MetaData implements Iterable { private final ImmutableMap aliasAndIndexToIndexMap; private final ImmutableMap> aliasAndIndexToIndexMap2; - private MetaData(ImmutableMap indices, ImmutableMap templates, boolean recoveredFromGateway) { + private MetaData(ImmutableMap indices, ImmutableMap templates) { this.indices = ImmutableMap.copyOf(indices); this.templates = templates; - this.recoveredFromGateway = recoveredFromGateway; int totalNumberOfShards = 0; for (IndexMetaData indexMetaData : indices.values()) { totalNumberOfShards += indexMetaData.totalNumberOfShards(); @@ -112,13 +110,6 @@ public class MetaData implements Iterable { this.aliasAndIndexToIndexMap2 = aliasAndIndexToIndexBuilder2.immutableMap(); } - /** - * Has the cluster state been recovered from the gateway. - */ - public boolean recoveredFromGateway() { - return this.recoveredFromGateway; - } - public ImmutableSet aliases() { return this.aliases; } @@ -255,12 +246,9 @@ public class MetaData implements Iterable { private MapBuilder templates = newMapBuilder(); - private boolean recoveredFromGateway = false; - public Builder metaData(MetaData metaData) { this.indices.putAll(metaData.indices); this.templates.putAll(metaData.templates); - this.recoveredFromGateway = metaData.recoveredFromGateway(); return this; } @@ -310,16 +298,8 @@ public class MetaData implements Iterable { return this; } - /** - * Indicates that this cluster state has been recovered from the gateawy. - */ - public Builder markAsRecoveredFromGateway() { - this.recoveredFromGateway = true; - return this; - } - public MetaData build() { - return new MetaData(indices.immutableMap(), templates.immutableMap(), recoveredFromGateway); + return new MetaData(indices.immutableMap(), templates.immutableMap()); } public static String toXContent(MetaData metaData) throws IOException { @@ -382,8 +362,6 @@ public class MetaData implements Iterable { public static MetaData readFrom(StreamInput in) throws IOException { Builder builder = new Builder(); - // we only serialize it using readFrom, not in to/from XContent - builder.recoveredFromGateway = in.readBoolean(); int size = in.readVInt(); for (int i = 0; i < size; i++) { builder.put(IndexMetaData.Builder.readFrom(in)); @@ -396,7 +374,6 @@ public class MetaData implements Iterable { } public static void writeTo(MetaData metaData, StreamOutput out) throws IOException { - out.writeBoolean(metaData.recoveredFromGateway()); out.writeVInt(metaData.indices.size()); for (IndexMetaData indexMetaData : metaData) { IndexMetaData.Builder.writeTo(indexMetaData, out); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java index 110028b6035..dfa9e564602 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java @@ -39,7 +39,7 @@ import org.elasticsearch.indices.IndexMissingException; */ public class MetaDataStateIndexService extends AbstractComponent { - public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false, ClusterBlockLevel.READ_WRITE); + public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false, false, ClusterBlockLevel.READ_WRITE); private final ClusterService clusterService; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/Discovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/Discovery.java index b987fc02782..877ad4559a5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/Discovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/Discovery.java @@ -34,7 +34,7 @@ import org.elasticsearch.common.component.LifecycleComponent; */ public interface Discovery extends LifecycleComponent { - final ClusterBlock NO_MASTER_BLOCK = new ClusterBlock(2, "no master", true, ClusterBlockLevel.ALL); + final ClusterBlock NO_MASTER_BLOCK = new ClusterBlock(2, "no master", true, true, ClusterBlockLevel.ALL); DiscoveryNode localNode(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index 784d83a0834..569cdf56d83 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -54,7 +54,7 @@ import static org.elasticsearch.common.unit.TimeValue.*; */ public class GatewayService extends AbstractLifecycleComponent implements ClusterStateListener { - public static final ClusterBlock STATE_NOT_RECOVERED_BLOCK = new ClusterBlock(1, "state not recovered / initialized", true, ClusterBlockLevel.ALL); + public static final ClusterBlock STATE_NOT_RECOVERED_BLOCK = new ClusterBlock(1, "state not recovered / initialized", true, true, ClusterBlockLevel.ALL); private final Gateway gateway; @@ -107,7 +107,7 @@ public class GatewayService extends AbstractLifecycleComponent i if (discoveryService.initialStateReceived()) { ClusterState clusterState = clusterService.state(); DiscoveryNodes nodes = clusterState.nodes(); - if (clusterState.nodes().localNodeMaster() && !clusterState.metaData().recoveredFromGateway()) { + if (clusterState.nodes().localNodeMaster() && clusterState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) { if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) { logger.debug("not recovering from gateway, nodes_size (data+master) [" + nodes.masterAndDataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]"); } else if (recoverAfterDataNodes != -1 && nodes.dataNodes().size() < recoverAfterDataNodes) { @@ -142,7 +142,7 @@ public class GatewayService extends AbstractLifecycleComponent i return; } if (event.localNodeMaster()) { - if (!event.state().metaData().recoveredFromGateway()) { + if (event.state().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) { ClusterState clusterState = event.state(); DiscoveryNodes nodes = clusterState.nodes(); if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) { @@ -228,8 +228,6 @@ public class GatewayService extends AbstractLifecycleComponent i @Override public ClusterState execute(ClusterState currentState) { MetaData.Builder metaDataBuilder = newMetaDataBuilder() .metaData(currentState.metaData()); - // mark the metadata as read from gateway - metaDataBuilder.markAsRecoveredFromGateway(); // add the index templates for (Map.Entry entry : recoveredState.metaData().templates().entrySet()) { @@ -298,15 +296,13 @@ public class GatewayService extends AbstractLifecycleComponent i private void markMetaDataAsReadFromGateway(String reason) { clusterService.submitStateUpdateTask("gateway (marked as read, reason=" + reason + ")", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - MetaData.Builder metaDataBuilder = newMetaDataBuilder() - .metaData(currentState.metaData()) - // mark the metadata as read from gateway - .markAsRecoveredFromGateway(); - + if (!currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) { + return currentState; + } // remove the block, since we recovered from gateway ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(STATE_NOT_RECOVERED_BLOCK); - return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).blocks(blocks).build(); + return newClusterStateBuilder().state(currentState).blocks(blocks).build(); } }); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index b58289ecc9f..a52397b88ed 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -37,7 +37,6 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.*; -import org.elasticsearch.discovery.Discovery; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.GatewayException; @@ -156,20 +155,20 @@ public class LocalGateway extends AbstractLifecycleComponent implements } @Override public void clusterChanged(final ClusterChangedEvent event) { - // nothing to do until we actually recover from the gateway - if (!event.state().metaData().recoveredFromGateway()) { + // the location is set to null, so we should not store it (for example, its not a data/master node) + if (location == null) { return; } - // the location is set to null, so we should not store it (for example, its not a data/master node) - if (location == null) { + // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency + if (event.state().blocks().disableStatePersistence()) { return; } // we only write the local metadata if this is a possible master node, the metadata has changed, and // we don't have a NO_MASTER block (in which case, the routing is cleaned, and we don't want to override what // we have now, since it might be needed when later on performing full state recovery) - if (event.state().nodes().localNode().masterNode() && event.metaDataChanged() && !event.state().blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) { + if (event.state().nodes().localNode().masterNode() && event.metaDataChanged()) { executor.execute(new Runnable() { @Override public void run() { LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java index 02f0c250625..e1e33e4b799 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java @@ -44,8 +44,6 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent