add metadata level version, reducing the persistency requirements fo rit

This commit is contained in:
kimchy 2011-06-10 01:37:20 +03:00
parent f87b9e3656
commit 4b6e2ddd7d
6 changed files with 65 additions and 15 deletions

View File

@ -21,19 +21,31 @@ package org.elasticsearch.cluster.metadata;
import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.*; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.collect.UnmodifiableIterator;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.Immutable; import org.elasticsearch.common.util.concurrent.Immutable;
import org.elasticsearch.common.xcontent.*; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexMissingException;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.collect.Lists.newArrayList; import static org.elasticsearch.common.collect.Lists.*;
import static org.elasticsearch.common.collect.MapBuilder.*; import static org.elasticsearch.common.collect.MapBuilder.*;
import static org.elasticsearch.common.collect.Sets.*; import static org.elasticsearch.common.collect.Sets.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*; import static org.elasticsearch.common.settings.ImmutableSettings.*;
@ -46,6 +58,7 @@ public class MetaData implements Iterable<IndexMetaData> {
public static final MetaData EMPTY_META_DATA = newMetaDataBuilder().build(); public static final MetaData EMPTY_META_DATA = newMetaDataBuilder().build();
private final long version;
private final ImmutableMap<String, IndexMetaData> indices; private final ImmutableMap<String, IndexMetaData> indices;
private final ImmutableMap<String, IndexTemplateMetaData> templates; private final ImmutableMap<String, IndexTemplateMetaData> templates;
@ -60,7 +73,8 @@ public class MetaData implements Iterable<IndexMetaData> {
private final ImmutableMap<String, String[]> aliasAndIndexToIndexMap; private final ImmutableMap<String, String[]> aliasAndIndexToIndexMap;
private MetaData(ImmutableMap<String, IndexMetaData> indices, ImmutableMap<String, IndexTemplateMetaData> templates) { private MetaData(long version, ImmutableMap<String, IndexMetaData> indices, ImmutableMap<String, IndexTemplateMetaData> templates) {
this.version = version;
this.indices = ImmutableMap.copyOf(indices); this.indices = ImmutableMap.copyOf(indices);
this.templates = templates; this.templates = templates;
int totalNumberOfShards = 0; int totalNumberOfShards = 0;
@ -127,6 +141,10 @@ public class MetaData implements Iterable<IndexMetaData> {
this.aliasAndIndexToIndexMap = aliasAndIndexToIndexBuilder.immutableMap(); this.aliasAndIndexToIndexMap = aliasAndIndexToIndexBuilder.immutableMap();
} }
public long version() {
return this.version;
}
public ImmutableSet<String> aliases() { public ImmutableSet<String> aliases() {
return this.aliases; return this.aliases;
} }
@ -348,11 +366,14 @@ public class MetaData implements Iterable<IndexMetaData> {
public static class Builder { public static class Builder {
private long version;
private MapBuilder<String, IndexMetaData> indices = newMapBuilder(); private MapBuilder<String, IndexMetaData> indices = newMapBuilder();
private MapBuilder<String, IndexTemplateMetaData> templates = newMapBuilder(); private MapBuilder<String, IndexTemplateMetaData> templates = newMapBuilder();
public Builder metaData(MetaData metaData) { public Builder metaData(MetaData metaData) {
this.version = metaData.version;
this.indices.putAll(metaData.indices); this.indices.putAll(metaData.indices);
this.templates.putAll(metaData.templates); this.templates.putAll(metaData.templates);
return this; return this;
@ -420,8 +441,13 @@ public class MetaData implements Iterable<IndexMetaData> {
return this; return this;
} }
public Builder version(long version) {
this.version = version;
return this;
}
public MetaData build() { public MetaData build() {
return new MetaData(indices.immutableMap(), templates.immutableMap()); return new MetaData(version, indices.immutableMap(), templates.immutableMap());
} }
public static String toXContent(MetaData metaData) throws IOException { public static String toXContent(MetaData metaData) throws IOException {
@ -484,6 +510,7 @@ public class MetaData implements Iterable<IndexMetaData> {
public static MetaData readFrom(StreamInput in) throws IOException { public static MetaData readFrom(StreamInput in) throws IOException {
Builder builder = new Builder(); Builder builder = new Builder();
builder.version = in.readLong();
int size = in.readVInt(); int size = in.readVInt();
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
builder.put(IndexMetaData.Builder.readFrom(in)); builder.put(IndexMetaData.Builder.readFrom(in));
@ -496,6 +523,7 @@ public class MetaData implements Iterable<IndexMetaData> {
} }
public static void writeTo(MetaData metaData, StreamOutput out) throws IOException { public static void writeTo(MetaData metaData, StreamOutput out) throws IOException {
out.writeLong(metaData.version);
out.writeVInt(metaData.indices.size()); out.writeVInt(metaData.indices.size());
for (IndexMetaData indexMetaData : metaData) { for (IndexMetaData indexMetaData : metaData) {
IndexMetaData.Builder.writeTo(indexMetaData, out); IndexMetaData.Builder.writeTo(indexMetaData, out);

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateListener; import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
@ -200,6 +201,9 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
if (previousClusterState.routingTable() != clusterState.routingTable()) { if (previousClusterState.routingTable() != clusterState.routingTable()) {
builder.routingTable(RoutingTable.builder().routingTable(clusterState.routingTable()).version(clusterState.routingTable().version() + 1)); builder.routingTable(RoutingTable.builder().routingTable(clusterState.routingTable()).version(clusterState.routingTable().version() + 1));
} }
if (previousClusterState.metaData() != clusterState.metaData()) {
builder.metaData(MetaData.builder().metaData(clusterState.metaData()).version(clusterState.metaData().version() + 1));
}
clusterState = builder.build(); clusterState = builder.build();
} else { } else {
// we got this cluster state from the master, filter out based on versions (don't call listeners) // we got this cluster state from the master, filter out based on versions (don't call listeners)

View File

@ -241,6 +241,9 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) { if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) {
builder.routingTable(currentState.routingTable()); builder.routingTable(currentState.routingTable());
} }
if (nodeSpecificClusterState.metaData().version() == currentState.metaData().version()) {
builder.metaData(currentState.metaData());
}
return builder.build(); return builder.build();
} }

View File

@ -435,6 +435,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
if (newState.routingTable().version() == currentState.routingTable().version()) { if (newState.routingTable().version() == currentState.routingTable().version()) {
builder.routingTable(currentState.routingTable()); builder.routingTable(currentState.routingTable());
} }
// same for metadata
if (newState.metaData().version() == currentState.metaData().version()) {
builder.metaData(currentState.metaData());
}
return builder.build(); return builder.build();
} }

View File

@ -20,11 +20,19 @@
package org.elasticsearch.gateway; package org.elasticsearch.gateway;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.*; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataStateIndexService;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
@ -229,6 +237,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
MetaData.Builder metaDataBuilder = newMetaDataBuilder() MetaData.Builder metaDataBuilder = newMetaDataBuilder()
.metaData(currentState.metaData()); .metaData(currentState.metaData());
metaDataBuilder.version(recoveredState.version());
// add the index templates // add the index templates
for (Map.Entry<String, IndexTemplateMetaData> entry : recoveredState.metaData().templates().entrySet()) { for (Map.Entry<String, IndexTemplateMetaData> entry : recoveredState.metaData().templates().entrySet()) {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.MutableShardRouting;
@ -165,7 +166,9 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
listener.onSuccess(ClusterState.builder().build()); listener.onSuccess(ClusterState.builder().build());
} else { } else {
logger.debug("elected state from [{}]", electedState.node()); logger.debug("elected state from [{}]", electedState.node());
listener.onSuccess(ClusterState.builder().version(electedState.state().version()).metaData(electedState.state().metaData()).build()); ClusterState.Builder builder = ClusterState.builder().version(electedState.state().version());
builder.metaData(MetaData.builder().metaData(electedState.state().metaData()).version(electedState.state().version()));
listener.onSuccess(builder.build());
} }
} }
@ -189,20 +192,19 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
} }
// we only write the local metadata if this is a possible master node // we only write the local metadata if this is a possible master node
// currently, we always write the metadata, since we want to keep it in sync with the latest version, but if (event.state().nodes().localNode().masterNode() && event.metaDataChanged()) {
// we need to think of a better way to not persist it when nothing changed
if (event.state().nodes().localNode().masterNode()) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override public void run() { @Override public void run() {
LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder(); LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder();
if (currentMetaState != null) { if (currentMetaState != null) {
builder.state(currentMetaState); builder.state(currentMetaState);
} }
builder.version(event.state().version()); final long version = event.state().metaData().version();
builder.version(version);
builder.metaData(event.state().metaData()); builder.metaData(event.state().metaData());
try { try {
File stateFile = new File(location, "metadata-" + event.state().version()); File stateFile = new File(location, "metadata-" + version);
OutputStream fos = new FileOutputStream(stateFile); OutputStream fos = new FileOutputStream(stateFile);
if (compress) { if (compress) {
fos = new LZFOutputStream(fos); fos = new LZFOutputStream(fos);
@ -225,7 +227,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
// delete all the other files // delete all the other files
File[] files = location.listFiles(new FilenameFilter() { File[] files = location.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) { @Override public boolean accept(File dir, String name) {
return name.startsWith("metadata-") && !name.equals("metadata-" + event.state().version()); return name.startsWith("metadata-") && !name.equals("metadata-" + version);
} }
}); });
for (File file : files) { for (File file : files) {