improve handling when not to persist current state to gateway based on blocks and have a block indicate if it also blocks state persistence

This commit is contained in:
kimchy 2011-01-18 15:28:55 +02:00
parent 87d5a92edb
commit e4a6e99f69
9 changed files with 54 additions and 50 deletions

View File

@ -106,6 +106,7 @@
<w>nospawn</w>
<w>param</w>
<w>params</w>
<w>persistency</w>
<w>pinger</w>
<w>pluggable</w>
<w>plugins</w>

View File

@ -41,13 +41,16 @@ public class ClusterBlock implements Serializable, Streamable, ToXContent {
private boolean retryable;
private ClusterBlock() {
private boolean disableStatePersistence = false;
ClusterBlock() {
}
public ClusterBlock(int id, String description, boolean retryable, ClusterBlockLevel... levels) {
public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence, ClusterBlockLevel... levels) {
this.id = id;
this.description = description;
this.retryable = retryable;
this.disableStatePersistence = disableStatePersistence;
this.levels = levels;
}
@ -72,14 +75,28 @@ public class ClusterBlock implements Serializable, Streamable, ToXContent {
return false;
}
/**
* Should operations get into retry state if this block is present.
*/
public boolean retryable() {
return this.retryable;
}
/**
* Should global state persistence be disabled when this block is present. Note,
* only relevant for global blocks.
*/
public boolean disableStatePersistence() {
return this.disableStatePersistence;
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Integer.toString(id));
builder.field("description", description);
builder.field("retryable", retryable);
if (disableStatePersistence) {
builder.field("disable_state_persistence", disableStatePersistence);
}
builder.startArray("levels");
for (ClusterBlockLevel level : levels) {
builder.value(level.name().toLowerCase());
@ -103,6 +120,7 @@ public class ClusterBlock implements Serializable, Streamable, ToXContent {
levels[i] = ClusterBlockLevel.fromId(in.readVInt());
}
retryable = in.readBoolean();
disableStatePersistence = in.readBoolean();
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -113,6 +131,7 @@ public class ClusterBlock implements Serializable, Streamable, ToXContent {
out.writeVInt(level.id());
}
out.writeBoolean(retryable);
out.writeBoolean(disableStatePersistence);
}
public String toString() {

View File

@ -91,6 +91,18 @@ public class ClusterBlocks {
return levelHolders[level.id()].indices();
}
/**
* Returns <tt>true</tt> if one of the global blocks as its disable state persistence flag set.
*/
public boolean disableStatePersistence() {
for (ClusterBlock clusterBlock : global) {
if (clusterBlock.disableStatePersistence()) {
return true;
}
}
return false;
}
public boolean hasGlobalBlock(ClusterBlock block) {
return global.contains(block);
}

View File

@ -46,7 +46,6 @@ public class MetaData implements Iterable<IndexMetaData> {
private final ImmutableMap<String, IndexTemplateMetaData> templates;
private final transient int totalNumberOfShards;
private final boolean recoveredFromGateway;
private final String[] allIndices;
@ -55,10 +54,9 @@ public class MetaData implements Iterable<IndexMetaData> {
private final ImmutableMap<String, String[]> aliasAndIndexToIndexMap;
private final ImmutableMap<String, ImmutableSet<String>> aliasAndIndexToIndexMap2;
private MetaData(ImmutableMap<String, IndexMetaData> indices, ImmutableMap<String, IndexTemplateMetaData> templates, boolean recoveredFromGateway) {
private MetaData(ImmutableMap<String, IndexMetaData> indices, ImmutableMap<String, IndexTemplateMetaData> templates) {
this.indices = ImmutableMap.copyOf(indices);
this.templates = templates;
this.recoveredFromGateway = recoveredFromGateway;
int totalNumberOfShards = 0;
for (IndexMetaData indexMetaData : indices.values()) {
totalNumberOfShards += indexMetaData.totalNumberOfShards();
@ -112,13 +110,6 @@ public class MetaData implements Iterable<IndexMetaData> {
this.aliasAndIndexToIndexMap2 = aliasAndIndexToIndexBuilder2.immutableMap();
}
/**
* Has the cluster state been recovered from the gateway.
*/
public boolean recoveredFromGateway() {
return this.recoveredFromGateway;
}
public ImmutableSet<String> aliases() {
return this.aliases;
}
@ -255,12 +246,9 @@ public class MetaData implements Iterable<IndexMetaData> {
private MapBuilder<String, IndexTemplateMetaData> templates = newMapBuilder();
private boolean recoveredFromGateway = false;
public Builder metaData(MetaData metaData) {
this.indices.putAll(metaData.indices);
this.templates.putAll(metaData.templates);
this.recoveredFromGateway = metaData.recoveredFromGateway();
return this;
}
@ -310,16 +298,8 @@ public class MetaData implements Iterable<IndexMetaData> {
return this;
}
/**
* Indicates that this cluster state has been recovered from the gateawy.
*/
public Builder markAsRecoveredFromGateway() {
this.recoveredFromGateway = true;
return this;
}
public MetaData build() {
return new MetaData(indices.immutableMap(), templates.immutableMap(), recoveredFromGateway);
return new MetaData(indices.immutableMap(), templates.immutableMap());
}
public static String toXContent(MetaData metaData) throws IOException {
@ -382,8 +362,6 @@ public class MetaData implements Iterable<IndexMetaData> {
public static MetaData readFrom(StreamInput in) throws IOException {
Builder builder = new Builder();
// we only serialize it using readFrom, not in to/from XContent
builder.recoveredFromGateway = in.readBoolean();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
builder.put(IndexMetaData.Builder.readFrom(in));
@ -396,7 +374,6 @@ public class MetaData implements Iterable<IndexMetaData> {
}
public static void writeTo(MetaData metaData, StreamOutput out) throws IOException {
out.writeBoolean(metaData.recoveredFromGateway());
out.writeVInt(metaData.indices.size());
for (IndexMetaData indexMetaData : metaData) {
IndexMetaData.Builder.writeTo(indexMetaData, out);

View File

@ -39,7 +39,7 @@ import org.elasticsearch.indices.IndexMissingException;
*/
public class MetaDataStateIndexService extends AbstractComponent {
public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false, ClusterBlockLevel.READ_WRITE);
public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false, false, ClusterBlockLevel.READ_WRITE);
private final ClusterService clusterService;

View File

@ -34,7 +34,7 @@ import org.elasticsearch.common.component.LifecycleComponent;
*/
public interface Discovery extends LifecycleComponent<Discovery> {
final ClusterBlock NO_MASTER_BLOCK = new ClusterBlock(2, "no master", true, ClusterBlockLevel.ALL);
final ClusterBlock NO_MASTER_BLOCK = new ClusterBlock(2, "no master", true, true, ClusterBlockLevel.ALL);
DiscoveryNode localNode();

View File

@ -54,7 +54,7 @@ import static org.elasticsearch.common.unit.TimeValue.*;
*/
public class GatewayService extends AbstractLifecycleComponent<GatewayService> implements ClusterStateListener {
public static final ClusterBlock STATE_NOT_RECOVERED_BLOCK = new ClusterBlock(1, "state not recovered / initialized", true, ClusterBlockLevel.ALL);
public static final ClusterBlock STATE_NOT_RECOVERED_BLOCK = new ClusterBlock(1, "state not recovered / initialized", true, true, ClusterBlockLevel.ALL);
private final Gateway gateway;
@ -107,7 +107,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
if (discoveryService.initialStateReceived()) {
ClusterState clusterState = clusterService.state();
DiscoveryNodes nodes = clusterState.nodes();
if (clusterState.nodes().localNodeMaster() && !clusterState.metaData().recoveredFromGateway()) {
if (clusterState.nodes().localNodeMaster() && clusterState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) {
logger.debug("not recovering from gateway, nodes_size (data+master) [" + nodes.masterAndDataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
} else if (recoverAfterDataNodes != -1 && nodes.dataNodes().size() < recoverAfterDataNodes) {
@ -142,7 +142,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
return;
}
if (event.localNodeMaster()) {
if (!event.state().metaData().recoveredFromGateway()) {
if (event.state().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
ClusterState clusterState = event.state();
DiscoveryNodes nodes = clusterState.nodes();
if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) {
@ -228,8 +228,6 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
.metaData(currentState.metaData());
// mark the metadata as read from gateway
metaDataBuilder.markAsRecoveredFromGateway();
// add the index templates
for (Map.Entry<String, IndexTemplateMetaData> entry : recoveredState.metaData().templates().entrySet()) {
@ -298,15 +296,13 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
private void markMetaDataAsReadFromGateway(String reason) {
clusterService.submitStateUpdateTask("gateway (marked as read, reason=" + reason + ")", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
.metaData(currentState.metaData())
// mark the metadata as read from gateway
.markAsRecoveredFromGateway();
if (!currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
return currentState;
}
// remove the block, since we recovered from gateway
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(STATE_NOT_RECOVERED_BLOCK);
return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).blocks(blocks).build();
return newClusterStateBuilder().state(currentState).blocks(blocks).build();
}
});
}

View File

@ -37,7 +37,6 @@ import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.GatewayException;
@ -156,20 +155,20 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
}
@Override public void clusterChanged(final ClusterChangedEvent event) {
// nothing to do until we actually recover from the gateway
if (!event.state().metaData().recoveredFromGateway()) {
// the location is set to null, so we should not store it (for example, its not a data/master node)
if (location == null) {
return;
}
// the location is set to null, so we should not store it (for example, its not a data/master node)
if (location == null) {
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (event.state().blocks().disableStatePersistence()) {
return;
}
// we only write the local metadata if this is a possible master node, the metadata has changed, and
// we don't have a NO_MASTER block (in which case, the routing is cleaned, and we don't want to override what
// we have now, since it might be needed when later on performing full state recovery)
if (event.state().nodes().localNode().masterNode() && event.metaDataChanged() && !event.state().blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) {
if (event.state().nodes().localNode().masterNode() && event.metaDataChanged()) {
executor.execute(new Runnable() {
@Override public void run() {
LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder();

View File

@ -44,8 +44,6 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
private final ClusterService clusterService;
private volatile boolean performedStateRecovery = false;
private volatile ExecutorService executor;
public SharedStorageGateway(Settings settings, ClusterService clusterService) {
@ -72,7 +70,6 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
}
@Override public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
performedStateRecovery = true;
executor.execute(new Runnable() {
@Override public void run() {
logger.debug("reading state from gateway {} ...", this);
@ -99,9 +96,12 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
if (!lifecycle.started()) {
return;
}
if (!performedStateRecovery) {
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (event.state().blocks().disableStatePersistence()) {
return;
}
if (event.localNodeMaster()) {
if (!event.metaDataChanged()) {
return;