From 4b6e2ddd7d27cc6478f0fcf4c4d3b0fc2ddc007d Mon Sep 17 00:00:00 2001 From: kimchy Date: Fri, 10 Jun 2011 01:37:20 +0300 Subject: [PATCH] add metadata level version, reducing the persistency requirements fo rit --- .../cluster/metadata/MetaData.java | 40 ++++++++++++++++--- .../service/InternalClusterService.java | 4 ++ .../discovery/local/LocalDiscovery.java | 3 ++ .../discovery/zen/ZenDiscovery.java | 4 ++ .../elasticsearch/gateway/GatewayService.java | 13 +++++- .../gateway/local/LocalGateway.java | 16 ++++---- 6 files changed, 65 insertions(+), 15 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 3c8731a93a9..9e538bef7f0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -21,19 +21,31 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.*; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.ImmutableSet; +import org.elasticsearch.common.collect.Lists; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.collect.Sets; +import org.elasticsearch.common.collect.UnmodifiableIterator; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.Immutable; -import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndexMissingException; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; -import static org.elasticsearch.common.collect.Lists.newArrayList; +import static org.elasticsearch.common.collect.Lists.*; import static org.elasticsearch.common.collect.MapBuilder.*; import static org.elasticsearch.common.collect.Sets.*; import static org.elasticsearch.common.settings.ImmutableSettings.*; @@ -46,6 +58,7 @@ public class MetaData implements Iterable { public static final MetaData EMPTY_META_DATA = newMetaDataBuilder().build(); + private final long version; private final ImmutableMap indices; private final ImmutableMap templates; @@ -60,7 +73,8 @@ public class MetaData implements Iterable { private final ImmutableMap aliasAndIndexToIndexMap; - private MetaData(ImmutableMap indices, ImmutableMap templates) { + private MetaData(long version, ImmutableMap indices, ImmutableMap templates) { + this.version = version; this.indices = ImmutableMap.copyOf(indices); this.templates = templates; int totalNumberOfShards = 0; @@ -127,6 +141,10 @@ public class MetaData implements Iterable { this.aliasAndIndexToIndexMap = aliasAndIndexToIndexBuilder.immutableMap(); } + public long version() { + return this.version; + } + public ImmutableSet aliases() { return this.aliases; } @@ -348,11 +366,14 @@ public class MetaData implements Iterable { public static class Builder { + private long version; + private MapBuilder indices = newMapBuilder(); private MapBuilder templates = newMapBuilder(); public Builder metaData(MetaData metaData) { + this.version = metaData.version; this.indices.putAll(metaData.indices); this.templates.putAll(metaData.templates); return this; @@ -420,8 +441,13 @@ public class MetaData implements Iterable { return this; } + public Builder version(long version) { + this.version = version; + return this; + } + public MetaData build() { - return new MetaData(indices.immutableMap(), templates.immutableMap()); + return new MetaData(version, indices.immutableMap(), templates.immutableMap()); } public static String toXContent(MetaData metaData) throws IOException { @@ -484,6 +510,7 @@ public class MetaData implements Iterable { public static MetaData readFrom(StreamInput in) throws IOException { Builder builder = new Builder(); + builder.version = in.readLong(); int size = in.readVInt(); for (int i = 0; i < size; i++) { builder.put(IndexMetaData.Builder.readFrom(in)); @@ -496,6 +523,7 @@ public class MetaData implements Iterable { } public static void writeTo(MetaData metaData, StreamOutput out) throws IOException { + out.writeLong(metaData.version); out.writeVInt(metaData.indices.size()); for (IndexMetaData indexMetaData : metaData) { IndexMetaData.Builder.writeTo(indexMetaData, out); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 312f20f15a3..e61c63b0d4c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.TimeoutClusterStateListener; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; @@ -200,6 +201,9 @@ public class InternalClusterService extends AbstractLifecycleComponent implem if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) { builder.routingTable(currentState.routingTable()); } + if (nodeSpecificClusterState.metaData().version() == currentState.metaData().version()) { + builder.metaData(currentState.metaData()); + } return builder.build(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 6426b512e0b..32d1827530c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -435,6 +435,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen if (newState.routingTable().version() == currentState.routingTable().version()) { builder.routingTable(currentState.routingTable()); } + // same for metadata + if (newState.metaData().version() == currentState.metaData().version()) { + builder.metaData(currentState.metaData()); + } return builder.build(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index fcfb1ea42d2..b4ec712debf 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -20,11 +20,19 @@ package org.elasticsearch.gateway; import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.metadata.*; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.cluster.metadata.MetaDataStateIndexService; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; @@ -229,6 +237,7 @@ public class GatewayService extends AbstractLifecycleComponent i MetaData.Builder metaDataBuilder = newMetaDataBuilder() .metaData(currentState.metaData()); + metaDataBuilder.version(recoveredState.version()); // add the index templates for (Map.Entry entry : recoveredState.metaData().templates().entrySet()) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index 83a24d0e2ed..24664e01fe5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.MutableShardRouting; @@ -165,7 +166,9 @@ public class LocalGateway extends AbstractLifecycleComponent implements listener.onSuccess(ClusterState.builder().build()); } else { logger.debug("elected state from [{}]", electedState.node()); - listener.onSuccess(ClusterState.builder().version(electedState.state().version()).metaData(electedState.state().metaData()).build()); + ClusterState.Builder builder = ClusterState.builder().version(electedState.state().version()); + builder.metaData(MetaData.builder().metaData(electedState.state().metaData()).version(electedState.state().version())); + listener.onSuccess(builder.build()); } } @@ -189,20 +192,19 @@ public class LocalGateway extends AbstractLifecycleComponent implements } // we only write the local metadata if this is a possible master node - // currently, we always write the metadata, since we want to keep it in sync with the latest version, but - // we need to think of a better way to not persist it when nothing changed - if (event.state().nodes().localNode().masterNode()) { + if (event.state().nodes().localNode().masterNode() && event.metaDataChanged()) { executor.execute(new Runnable() { @Override public void run() { LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder(); if (currentMetaState != null) { builder.state(currentMetaState); } - builder.version(event.state().version()); + final long version = event.state().metaData().version(); + builder.version(version); builder.metaData(event.state().metaData()); try { - File stateFile = new File(location, "metadata-" + event.state().version()); + File stateFile = new File(location, "metadata-" + version); OutputStream fos = new FileOutputStream(stateFile); if (compress) { fos = new LZFOutputStream(fos); @@ -225,7 +227,7 @@ public class LocalGateway extends AbstractLifecycleComponent implements // delete all the other files File[] files = location.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { - return name.startsWith("metadata-") && !name.equals("metadata-" + event.state().version()); + return name.startsWith("metadata-") && !name.equals("metadata-" + version); } }); for (File file : files) {