From 942b427940f8dbc3695e391e2912969ded5625d8 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sun, 22 Jan 2012 23:34:34 +0200 Subject: [PATCH] Local Gateway: Store specific index metadata under dedicated index locations, closes #1631. --- config/elasticsearch.yml | 8 +- .../state/TransportClusterStateAction.java | 2 +- .../cluster/metadata/IndexMetaData.java | 34 +- .../metadata/IndexTemplateMetaData.java | 26 + .../cluster/metadata/MetaData.java | 51 +- .../metadata/MetaDataCreateIndexService.java | 2 +- .../metadata/MetaDataMappingService.java | 6 +- .../cluster/routing/RoutingService.java | 16 +- .../service/InternalClusterService.java | 211 ++++---- .../common/collect/MapBuilder.java | 5 + .../discovery/zen/ZenDiscovery.java | 26 +- .../elasticsearch/env/NodeEnvironment.java | 23 +- .../org/elasticsearch/gateway/Gateway.java | 2 +- .../elasticsearch/gateway/GatewayService.java | 30 +- .../gateway/local/LocalGateway.java | 324 ++++--------- .../gateway/local/LocalGatewayMetaState.java | 131 ----- .../gateway/local/LocalGatewayModule.java | 3 + .../state/meta/LocalGatewayMetaState.java | 454 ++++++++++++++++++ .../TransportNodesListGatewayMetaState.java | 27 +- .../state/shards/LocalGatewayShardsState.java | 63 +-- .../gateway/shared/SharedStorageGateway.java | 15 +- .../test/integration/AbstractNodesTests.java | 4 + .../cluster/MinimumMasterNodesTests.java | 2 +- 23 files changed, 864 insertions(+), 601 deletions(-) delete mode 100644 src/main/java/org/elasticsearch/gateway/local/LocalGatewayMetaState.java create mode 100644 src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java rename src/main/java/org/elasticsearch/gateway/local/{ => state/meta}/TransportNodesListGatewayMetaState.java (91%) diff --git a/config/elasticsearch.yml b/config/elasticsearch.yml index 9ff4d24e338..4789ccfd1e7 100644 --- a/config/elasticsearch.yml +++ b/config/elasticsearch.yml @@ -234,7 +234,8 @@ # gateway.type: local # Settings below control how and when to start the initial recovery process on -# a full cluster restart (to reuse as much local data as possible). +# a full cluster restart (to reuse as much local data as possible when using shared +# gateway). # Allow recovery process after N nodes in a cluster are up: # @@ -246,7 +247,8 @@ # gateway.recover_after_time: 5m # Set how many nodes are expected in this cluster. Once these N nodes -# are up, begin recovery process immediately: +# are up (and recover_after_nodes is met), begin recovery process immediately +# (without waiting for recover_after_time to expire): # # gateway.expected_nodes: 2 @@ -284,7 +286,7 @@ # Set to ensure a node sees N other master eligible nodes to be considered # operational within the cluster. Set this option to a higher value (2-4) -# for large clusters: +# for large clusters (>3 nodes): # # discovery.zen.minimum_master_nodes: 1 diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index 48f3f4d0de0..a31b564186a 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -99,7 +99,7 @@ public class TransportClusterStateAction extends TransportMasterNodeOperationAct for (String filteredIndex : indices) { IndexMetaData indexMetaData = currentState.metaData().index(filteredIndex); if (indexMetaData != null) { - mdBuilder.put(indexMetaData); + mdBuilder.put(indexMetaData, false); } } } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 46a77fe43ec..3ed61adf271 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -117,6 +117,7 @@ public class IndexMetaData { public static final String SETTING_READ_ONLY = "index.blocks.read_only"; private final String index; + private final long version; private final State state; @@ -131,10 +132,11 @@ public class IndexMetaData { private final DiscoveryNodeFilters includeFilters; private final DiscoveryNodeFilters excludeFilters; - private IndexMetaData(String index, State state, Settings settings, ImmutableMap mappings, ImmutableMap aliases) { + private IndexMetaData(String index, long version, State state, Settings settings, ImmutableMap mappings, ImmutableMap aliases) { Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1) != -1, "must specify numberOfShards for index [" + index + "]"); Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1) != -1, "must specify numberOfReplicas for index [" + index + "]"); this.index = index; + this.version = version; this.state = state; this.settings = settings; this.mappings = mappings; @@ -164,6 +166,14 @@ public class IndexMetaData { return index(); } + public long version() { + return this.version; + } + + public long getVersion() { + return this.version; + } + public State state() { return this.state; } @@ -274,6 +284,8 @@ public class IndexMetaData { private State state = State.OPEN; + private long version = 1; + private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS; private MapBuilder mappings = MapBuilder.newMapBuilder(); @@ -290,6 +302,7 @@ public class IndexMetaData { mappings.putAll(indexMetaData.mappings); aliases.putAll(indexMetaData.aliases); this.state = indexMetaData.state; + this.version = indexMetaData.version; } public String index() { @@ -364,6 +377,15 @@ public class IndexMetaData { return this; } + public long version() { + return this.version; + } + + public Builder version(long version) { + this.version = version; + return this; + } + public IndexMetaData build() { MapBuilder tmpAliases = aliases; Settings tmpSettings = settings; @@ -381,12 +403,13 @@ public class IndexMetaData { tmpSettings = ImmutableSettings.settingsBuilder().put(settings).putArray("index.aliases").build(); } - return new IndexMetaData(index, state, tmpSettings, mappings.immutableMap(), tmpAliases.immutableMap()); + return new IndexMetaData(index, version, state, tmpSettings, mappings.immutableMap(), tmpAliases.immutableMap()); } public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(indexMetaData.index(), XContentBuilder.FieldCaseConversion.NONE); + builder.field("version", indexMetaData.version()); builder.field("state", indexMetaData.state().toString().toLowerCase()); builder.startObject("settings"); @@ -416,6 +439,9 @@ public class IndexMetaData { } public static IndexMetaData fromXContent(XContentParser parser) throws IOException { + if (parser.currentToken() == XContentParser.Token.START_OBJECT) { + parser.nextToken(); + } Builder builder = new Builder(parser.currentName()); String currentFieldName = null; @@ -449,6 +475,8 @@ public class IndexMetaData { } else if (token.isValue()) { if ("state".equals(currentFieldName)) { builder.state(State.fromString(parser.text())); + } else if ("version".equals(currentFieldName)) { + builder.version(parser.longValue()); } } } @@ -457,6 +485,7 @@ public class IndexMetaData { public static IndexMetaData readFrom(StreamInput in) throws IOException { Builder builder = new Builder(in.readUTF()); + builder.version(in.readLong()); builder.state(State.fromId(in.readByte())); builder.settings(readSettingsFromStream(in)); int mappingsSize = in.readVInt(); @@ -474,6 +503,7 @@ public class IndexMetaData { public static void writeTo(IndexMetaData indexMetaData, StreamOutput out) throws IOException { out.writeUTF(indexMetaData.index()); + out.writeLong(indexMetaData.version()); out.writeByte(indexMetaData.state().id()); writeSettingsToStream(indexMetaData.settings(), out); out.writeVInt(indexMetaData.mappings().size()); diff --git a/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java index f79e8ad623a..3026d4e753b 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java @@ -101,6 +101,32 @@ public class IndexTemplateMetaData { return new Builder(name); } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + IndexTemplateMetaData that = (IndexTemplateMetaData) o; + + if (order != that.order) return false; + if (!mappings.equals(that.mappings)) return false; + if (!name.equals(that.name)) return false; + if (!settings.equals(that.settings)) return false; + if (!template.equals(that.template)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + order; + result = 31 * result + template.hashCode(); + result = 31 * result + settings.hashCode(); + result = 31 * result + mappings.hashCode(); + return result; + } + public static class Builder { private String name; diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 16ff991aacd..88a00d8e833 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -628,6 +628,12 @@ public class MetaData implements Iterable { return indices.values().iterator(); } + public static boolean isGlobalStateEquals(MetaData metaData1, MetaData metaData2) { + if (!metaData1.persistentSettings.equals(metaData2.persistentSettings)) return false; + if (!metaData1.templates.equals(metaData2.templates())) return false; + return true; + } + public static Builder builder() { return new Builder(); } @@ -658,10 +664,21 @@ public class MetaData implements Iterable { } public Builder put(IndexMetaData.Builder indexMetaDataBuilder) { - return put(indexMetaDataBuilder.build()); + // we know its a new one, increment the version and store + indexMetaDataBuilder.version(indexMetaDataBuilder.version() + 1); + IndexMetaData indexMetaData = indexMetaDataBuilder.build(); + indices.put(indexMetaData.index(), indexMetaData); + return this; } - public Builder put(IndexMetaData indexMetaData) { + public Builder put(IndexMetaData indexMetaData, boolean incrementVersion) { + if (indices.get(indexMetaData.index()) == indexMetaData) { + return this; + } + // if we put a new index metadata, increment its version + if (incrementVersion) { + indexMetaData = IndexMetaData.newIndexMetaDataBuilder(indexMetaData).version(indexMetaData.version() + 1).build(); + } indices.put(indexMetaData.index(), indexMetaData); return this; } @@ -675,6 +692,11 @@ public class MetaData implements Iterable { return this; } + public Builder removeAllIndices() { + indices.clear(); + return this; + } + public Builder put(IndexTemplateMetaData.Builder template) { return put(template.build()); } @@ -699,8 +721,7 @@ public class MetaData implements Iterable { throw new IndexMissingException(new Index(index)); } put(IndexMetaData.newIndexMetaDataBuilder(indexMetaData) - .settings(settingsBuilder().put(indexMetaData.settings()).put(settings)) - .build()); + .settings(settingsBuilder().put(indexMetaData.settings()).put(settings))); } return this; } @@ -714,7 +735,7 @@ public class MetaData implements Iterable { if (indexMetaData == null) { throw new IndexMissingException(new Index(index)); } - put(IndexMetaData.newIndexMetaDataBuilder(indexMetaData).numberOfReplicas(numberOfReplicas).build()); + put(IndexMetaData.newIndexMetaDataBuilder(indexMetaData).numberOfReplicas(numberOfReplicas)); } return this; } @@ -757,6 +778,8 @@ public class MetaData implements Iterable { public static void toXContent(MetaData metaData, XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject("meta-data"); + builder.field("version", metaData.version()); + if (!metaData.persistentSettings().getAsMap().isEmpty()) { builder.startObject("settings"); for (Map.Entry entry : metaData.persistentSettings().getAsMap().entrySet()) { @@ -771,11 +794,13 @@ public class MetaData implements Iterable { } builder.endObject(); - builder.startObject("indices"); - for (IndexMetaData indexMetaData : metaData) { - IndexMetaData.Builder.toXContent(indexMetaData, builder, params); + if (!metaData.indices().isEmpty()) { + builder.startObject("indices"); + for (IndexMetaData indexMetaData : metaData) { + IndexMetaData.Builder.toXContent(indexMetaData, builder, params); + } + builder.endObject(); } - builder.endObject(); builder.endObject(); } @@ -809,13 +834,17 @@ public class MetaData implements Iterable { builder.persistentSettings(settingsBuilder.build()); } else if ("indices".equals(currentFieldName)) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - builder.put(IndexMetaData.Builder.fromXContent(parser)); + builder.put(IndexMetaData.Builder.fromXContent(parser), false); } } else if ("templates".equals(currentFieldName)) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { builder.put(IndexTemplateMetaData.Builder.fromXContent(parser)); } } + } else if (token.isValue()) { + if ("version".equals(currentFieldName)) { + builder.version = parser.longValue(); + } } } return builder.build(); @@ -828,7 +857,7 @@ public class MetaData implements Iterable { builder.persistentSettings(readSettingsFromStream(in)); int size = in.readVInt(); for (int i = 0; i < size; i++) { - builder.put(IndexMetaData.Builder.readFrom(in)); + builder.put(IndexMetaData.Builder.readFrom(in), false); } size = in.readVInt(); for (int i = 0; i < size; i++) { diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 7fd7c6fab47..5138cdaf0c2 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -256,7 +256,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { MetaData newMetaData = newMetaDataBuilder() .metaData(currentState.metaData()) - .put(indexMetaData) + .put(indexMetaData, false) .build(); logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", request.index, request.cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet()); diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index aff6d5fec4a..2561cb0cf68 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -264,6 +264,7 @@ public class MetaDataMappingService extends AbstractComponent { } public void putMapping(final PutRequest request, final Listener listener) { + final AtomicBoolean notifyOnPostProcess = new AtomicBoolean(); clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -394,7 +395,7 @@ public class MetaDataMappingService extends AbstractComponent { } if (counter == 0) { - listener.onResponse(new Response(true)); + notifyOnPostProcess.set(true); return updatedState; } mappingCreatedAction.add(new CountDownListener(counter, listener), request.timeout); @@ -411,6 +412,9 @@ public class MetaDataMappingService extends AbstractComponent { @Override public void clusterStateProcessed(ClusterState clusterState) { + if (notifyOnPostProcess.get()) { + listener.onResponse(new Response(true)); + } } }); } diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 337b54d415f..639444bd34f 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -61,15 +61,19 @@ public class RoutingService extends AbstractLifecycleComponent i this.clusterService = clusterService; this.allocationService = allocationService; this.schedule = componentSettings.getAsTime("schedule", timeValueSeconds(10)); - } - - @Override - protected void doStart() throws ElasticSearchException { clusterService.addPriority(this); } + @Override + protected void doStart() throws ElasticSearchException { + } + @Override protected void doStop() throws ElasticSearchException { + } + + @Override + protected void doClose() throws ElasticSearchException { if (scheduledRoutingTableFuture != null) { scheduledRoutingTableFuture.cancel(true); scheduledRoutingTableFuture = null; @@ -77,10 +81,6 @@ public class RoutingService extends AbstractLifecycleComponent i clusterService.remove(this); } - @Override - protected void doClose() throws ElasticSearchException { - } - @Override public void clusterChanged(ClusterChangedEvent event) { if (event.source().equals(CLUSTER_UPDATE_TASK_SOURCE)) { diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index cfc53b4afd1..a59cf29a220 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -164,6 +164,8 @@ public class InternalClusterService extends AbstractLifecycleComponent it = onGoingTimeouts.iterator(); it.hasNext(); ) { NotifyTimeout timeout = it.next(); if (timeout.listener.equals(listener)) { @@ -204,109 +206,126 @@ public class InternalClusterService extends AbstractLifecycleComponent 0) { - logger.info("{}, reason: {}", summary, source); - } - } - - // TODO, do this in parallel (and wait) - for (DiscoveryNode node : nodesDelta.addedNodes()) { - if (!nodeRequiresConnection(node)) { - continue; - } - try { - transportService.connectToNode(node); - } catch (Exception e) { - // the fault detection will detect it as failed as well - logger.warn("failed to connect to node [" + node + "]", e); - } - } - - for (ClusterStateListener listener : priorityClusterStateListeners) { - listener.clusterChanged(clusterChangedEvent); - } - for (ClusterStateListener listener : clusterStateListeners) { - listener.clusterChanged(clusterChangedEvent); - } - for (ClusterStateListener listener : lastClusterStateListeners) { - listener.clusterChanged(clusterChangedEvent); - } - - if (!nodesDelta.removedNodes().isEmpty()) { - threadPool.cached().execute(new Runnable() { - @Override - public void run() { - for (DiscoveryNode node : nodesDelta.removedNodes()) { - transportService.disconnectFromNode(node); - } - } - }); - } - - // if we are the master, publish the new state to all nodes - if (clusterState.nodes().localNodeMaster()) { - discoveryService.publish(clusterState); - } - - if (updateTask instanceof ProcessedClusterStateUpdateTask) { - ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(clusterState); - } - - logger.debug("processing [{}]: done applying updated cluster_state", source); - } catch (Exception e) { - StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n"); - sb.append(clusterState.nodes().prettyPrint()); - sb.append(clusterState.routingTable().prettyPrint()); - sb.append(clusterState.readOnlyRoutingNodes().prettyPrint()); - logger.warn(sb.toString(), e); - } - } else { + if (previousClusterState == newClusterState) { logger.debug("processing [{}]: no change in cluster_state", source); + return; + } + + try { + if (newClusterState.nodes().localNodeMaster()) { + // only the master controls the version numbers + Builder builder = ClusterState.builder().state(newClusterState).version(newClusterState.version() + 1); + if (previousClusterState.routingTable() != newClusterState.routingTable()) { + builder.routingTable(RoutingTable.builder().routingTable(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1)); + } + if (previousClusterState.metaData() != newClusterState.metaData()) { + builder.metaData(MetaData.builder().metaData(newClusterState.metaData()).version(newClusterState.metaData().version() + 1)); + } + newClusterState = builder.build(); + } else { + if (previousClusterState.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK) && !newClusterState.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) { + // force an update, its a fresh update from the master as we transition from a start of not having a master to having one + // have a fresh instances of routing and metadata to remove the chance that version might be the same + Builder builder = ClusterState.builder().state(newClusterState); + builder.routingTable(RoutingTable.builder().routingTable(newClusterState.routingTable())); + builder.metaData(MetaData.builder().metaData(newClusterState.metaData())); + newClusterState = builder.build(); + logger.debug("got first state from fresh master [{}]", newClusterState.nodes().masterNodeId()); + } else if (newClusterState.version() < previousClusterState.version()) { + // we got this cluster state from the master, filter out based on versions (don't call listeners) + logger.debug("got old cluster state [" + newClusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring"); + return; + } + } + + if (logger.isTraceEnabled()) { + StringBuilder sb = new StringBuilder("cluster state updated:\nversion [").append(newClusterState.version()).append("], source [").append(source).append("]\n"); + sb.append(newClusterState.nodes().prettyPrint()); + sb.append(newClusterState.routingTable().prettyPrint()); + sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint()); + logger.trace(sb.toString()); + } else if (logger.isDebugEnabled()) { + logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source); + } + + ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState); + // new cluster state, notify all listeners + final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); + if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { + String summary = nodesDelta.shortSummary(); + if (summary.length() > 0) { + logger.info("{}, reason: {}", summary, source); + } + } + + // TODO, do this in parallel (and wait) + for (DiscoveryNode node : nodesDelta.addedNodes()) { + if (!nodeRequiresConnection(node)) { + continue; + } + try { + transportService.connectToNode(node); + } catch (Exception e) { + // the fault detection will detect it as failed as well + logger.warn("failed to connect to node [" + node + "]", e); + } + } + + // if we are the master, publish the new state to all nodes + // we publish here before we send a notification to all the listeners, since if it fails + // we don't want to notify + if (newClusterState.nodes().localNodeMaster()) { + discoveryService.publish(newClusterState); + } + + // update the current cluster state + clusterState = newClusterState; + + for (ClusterStateListener listener : priorityClusterStateListeners) { + listener.clusterChanged(clusterChangedEvent); + } + for (ClusterStateListener listener : clusterStateListeners) { + listener.clusterChanged(clusterChangedEvent); + } + for (ClusterStateListener listener : lastClusterStateListeners) { + listener.clusterChanged(clusterChangedEvent); + } + + if (!nodesDelta.removedNodes().isEmpty()) { + threadPool.cached().execute(new Runnable() { + @Override + public void run() { + for (DiscoveryNode node : nodesDelta.removedNodes()) { + transportService.disconnectFromNode(node); + } + } + }); + } + + + if (updateTask instanceof ProcessedClusterStateUpdateTask) { + ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(newClusterState); + } + + logger.debug("processing [{}]: done applying updated cluster_state", source); + } catch (Exception e) { + StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(newClusterState.version()).append("], source [").append(source).append("]\n"); + sb.append(newClusterState.nodes().prettyPrint()); + sb.append(newClusterState.routingTable().prettyPrint()); + sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint()); + logger.warn(sb.toString(), e); } } }); diff --git a/src/main/java/org/elasticsearch/common/collect/MapBuilder.java b/src/main/java/org/elasticsearch/common/collect/MapBuilder.java index 82176aa5362..e69e991f17c 100644 --- a/src/main/java/org/elasticsearch/common/collect/MapBuilder.java +++ b/src/main/java/org/elasticsearch/common/collect/MapBuilder.java @@ -59,6 +59,11 @@ public class MapBuilder { return this; } + public MapBuilder clear() { + this.map.clear(); + return this; + } + public V get(K key) { return map.get(key); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index efaed31d0cb..d1f1c72d47f 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -315,9 +315,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen continue; } // send join request - ClusterState clusterState; + ClusterState joinClusterStateX; try { - clusterState = membership.sendJoinRequestBlocking(masterNode, localNode, pingTimeout); + joinClusterStateX = membership.sendJoinRequestBlocking(masterNode, localNode, pingTimeout); } catch (Exception e) { if (e instanceof ElasticSearchException) { logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ((ElasticSearchException) e).getDetailedMessage()); @@ -332,26 +332,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen continue; } masterFD.start(masterNode, "initial_join"); - - // we update the metadata once we managed to join, so we pre-create indices and so on (no shards allocation) - final MetaData metaData = clusterState.metaData(); - // sync also the version with the version the master currently has, so the next update will be applied - final long version = clusterState.version(); - clusterService.submitStateUpdateTask("zen-disco-join (detected master)", new ProcessedClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NO_MASTER_BLOCK).build(); - // make sure we have the local node id set, we might need it as a result of the new metadata - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.newNodesBuilder().putAll(currentState.nodes()).put(localNode).localNodeId(localNode.id()); - return newClusterStateBuilder().state(currentState).nodes(nodesBuilder).blocks(clusterBlocks).metaData(metaData).version(version).build(); - } - - @Override - public void clusterStateProcessed(ClusterState clusterState) { - // don't send initial state event, since we want to get the cluster state from the master that includes us first -// sendInitialStateEventIfNeeded(); - } - }); + // no need to submit the received cluster state, we will get it from the master when it publishes + // the fact that we joined } } } diff --git a/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/src/main/java/org/elasticsearch/env/NodeEnvironment.java index fa71a28a665..2ca6fdbe26b 100644 --- a/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -20,6 +20,7 @@ package org.elasticsearch.env; import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; import org.apache.lucene.store.Lock; import org.apache.lucene.store.NativeFSLockFactory; import org.elasticsearch.ElasticSearchIllegalStateException; @@ -177,6 +178,23 @@ public class NodeEnvironment extends AbstractComponent { return shardLocations; } + public Set finalAllIndices() throws Exception { + if (nodeFiles == null || locks == null) { + throw new ElasticSearchIllegalStateException("node is not configured to store local location"); + } + Set indices = Sets.newHashSet(); + for (File indicesLocation : nodeIndicesLocations) { + File[] indicesList = indicesLocation.listFiles(); + if (indicesList == null) { + continue; + } + for (File indexLocation : indicesList) { + indices.add(indexLocation.getName()); + } + } + return indices; + } + public Set findAllShardIds() throws Exception { if (nodeFiles == null || locks == null) { throw new ElasticSearchIllegalStateException("node is not configured to store local location"); @@ -200,7 +218,10 @@ public class NodeEnvironment extends AbstractComponent { if (!shardLocation.isDirectory()) { continue; } - shardIds.add(new ShardId(indexName, Integer.parseInt(shardLocation.getName()))); + Integer shardId = Ints.tryParse(shardLocation.getName()); + if (shardId != null) { + shardIds.add(new ShardId(indexName, shardId)); + } } } } diff --git a/src/main/java/org/elasticsearch/gateway/Gateway.java b/src/main/java/org/elasticsearch/gateway/Gateway.java index 8fdd5a84190..3e34284431c 100644 --- a/src/main/java/org/elasticsearch/gateway/Gateway.java +++ b/src/main/java/org/elasticsearch/gateway/Gateway.java @@ -39,6 +39,6 @@ public interface Gateway extends LifecycleComponent { interface GatewayStateRecoveredListener { void onSuccess(ClusterState recoveredState); - void onFailure(Throwable t); + void onFailure(String message); } } diff --git a/src/main/java/org/elasticsearch/gateway/GatewayService.java b/src/main/java/org/elasticsearch/gateway/GatewayService.java index b415cc8d33e..429b39296f4 100644 --- a/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataStateIndexService; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -42,7 +41,6 @@ import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -92,7 +90,8 @@ public class GatewayService extends AbstractLifecycleComponent i this.expectedNodes = componentSettings.getAsInt("expected_nodes", -1); this.recoverAfterDataNodes = componentSettings.getAsInt("recover_after_data_nodes", -1); this.expectedDataNodes = componentSettings.getAsInt("expected_data_nodes", -1); - this.recoverAfterMasterNodes = componentSettings.getAsInt("recover_after_master_nodes", -1); + // default the recover after master nodes to the minimum master nodes in the discovery + this.recoverAfterMasterNodes = componentSettings.getAsInt("recover_after_master_nodes", settings.getAsInt("discovery.zen.minimum_master_nodes", -1)); this.expectedMasterNodes = componentSettings.getAsInt("expected_master_nodes", -1); // Add the not recovered as initial state block, we don't allow anything until @@ -140,7 +139,7 @@ public class GatewayService extends AbstractLifecycleComponent i } else { logger.debug("can't wait on start for (possibly) reading state from gateway, will do it asynchronously"); } - clusterService.add(this); + clusterService.addLast(this); } @Override @@ -249,23 +248,14 @@ public class GatewayService extends AbstractLifecycleComponent i .removeGlobalBlock(STATE_NOT_RECOVERED_BLOCK); MetaData.Builder metaDataBuilder = newMetaDataBuilder() - .metaData(currentState.metaData()); - metaDataBuilder.version(recoveredState.version()); - - metaDataBuilder.persistentSettings(recoveredState.metaData().persistentSettings()); - - // add the index templates - for (Map.Entry entry : recoveredState.metaData().templates().entrySet()) { - metaDataBuilder.put(entry.getValue()); - } - + .metaData(recoveredState.metaData()); if (recoveredState.metaData().settings().getAsBoolean(MetaData.SETTING_READ_ONLY, false) || currentState.metaData().settings().getAsBoolean(MetaData.SETTING_READ_ONLY, false)) { blocks.addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK); } for (IndexMetaData indexMetaData : recoveredState.metaData()) { - metaDataBuilder.put(indexMetaData); + metaDataBuilder.put(indexMetaData, false); if (indexMetaData.state() == IndexMetaData.State.CLOSE) { blocks.addIndexBlock(indexMetaData.index(), MetaDataStateIndexService.INDEX_CLOSED_BLOCK); } @@ -276,7 +266,6 @@ public class GatewayService extends AbstractLifecycleComponent i // update the state to reflect the new metadata and routing ClusterState updatedState = newClusterStateBuilder().state(currentState) - .version(recoveredState.version()) .blocks(blocks) .metaData(metaDataBuilder) .build(); @@ -290,7 +279,8 @@ public class GatewayService extends AbstractLifecycleComponent i routingTableBuilder.add(indexRoutingBuilder); } } - routingTableBuilder.version(recoveredState.version()); + // start with 0 based versions for routing table + routingTableBuilder.version(0); // now, reroute RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build()); @@ -307,9 +297,11 @@ public class GatewayService extends AbstractLifecycleComponent i } @Override - public void onFailure(Throwable t) { + public void onFailure(String message) { + recovered.set(false); + scheduledRecovery.set(false); // don't remove the block here, we don't want to allow anything in such a case - logger.error("failed recover state, blocking...", t); + logger.info("metadata state not restored, reason: {}", message); } } } diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index 79d8eb38034..7ddcc4d63f9 100644 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -20,37 +20,29 @@ package org.elasticsearch.gateway.local; import com.google.common.collect.Sets; -import com.google.common.io.Closeables; +import gnu.trove.map.hash.TObjectIntHashMap; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.compress.lzf.LZF; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.io.FileSystemUtils; -import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.*; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.thread.LoggingRunnable; -import org.elasticsearch.common.xcontent.*; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.GatewayException; +import org.elasticsearch.gateway.local.state.meta.LocalGatewayMetaState; +import org.elasticsearch.gateway.local.state.meta.TransportNodesListGatewayMetaState; import org.elasticsearch.gateway.local.state.shards.LocalGatewayShardsState; import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule; -import java.io.*; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -import static java.util.concurrent.Executors.newSingleThreadExecutor; -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; /** * @@ -62,32 +54,28 @@ public class LocalGateway extends AbstractLifecycleComponent implements private final NodeEnvironment nodeEnv; private final LocalGatewayShardsState shardsState; + private final LocalGatewayMetaState metaState; private final TransportNodesListGatewayMetaState listGatewayMetaState; - private final boolean compress; - private final boolean prettyPrint; - - private volatile LocalGatewayMetaState currentMetaState; - - private volatile ExecutorService executor; - - private volatile boolean initialized = false; - - private volatile boolean metaDataPersistedAtLeastOnce = false; + private final String initialMeta; @Inject - public LocalGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, LocalGatewayShardsState shardsState, + public LocalGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, + LocalGatewayShardsState shardsState, LocalGatewayMetaState metaState, TransportNodesListGatewayMetaState listGatewayMetaState) { super(settings); this.clusterService = clusterService; this.nodeEnv = nodeEnv; - this.listGatewayMetaState = listGatewayMetaState.initGateway(this); + this.metaState = metaState; + this.listGatewayMetaState = listGatewayMetaState; this.shardsState = shardsState; - this.compress = componentSettings.getAsBoolean("compress", true); - this.prettyPrint = componentSettings.getAsBoolean("pretty", false); + clusterService.addLast(this); + + // we define what is our minimum "master" nodes, use that to allow for recovery + this.initialMeta = componentSettings.get("initial_meta", settings.get("discovery.zen.minimum_master_nodes", "1")); } @Override @@ -95,31 +83,17 @@ public class LocalGateway extends AbstractLifecycleComponent implements return "local"; } - public LocalGatewayMetaState currentMetaState() { - lazyInitialize(); - return this.currentMetaState; - } - @Override protected void doStart() throws ElasticSearchException { - this.executor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway")); - lazyInitialize(); - clusterService.addLast(this); } @Override protected void doStop() throws ElasticSearchException { - clusterService.remove(this); - executor.shutdown(); - try { - executor.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // ignore - } } @Override protected void doClose() throws ElasticSearchException { + clusterService.remove(this); } @Override @@ -128,32 +102,90 @@ public class LocalGateway extends AbstractLifecycleComponent implements nodesIds.addAll(clusterService.state().nodes().masterNodes().keySet()); TransportNodesListGatewayMetaState.NodesLocalGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet(); + + int requiredAllocation = 1; + try { + if ("quorum".equals(initialMeta)) { + if (nodesIds.size() > 2) { + requiredAllocation = (nodesIds.size() / 2) + 1; + } + } else if ("quorum-1".equals(initialMeta) || "half".equals(initialMeta)) { + if (nodesIds.size() > 2) { + requiredAllocation = ((1 + nodesIds.size()) / 2); + } + } else if ("one".equals(initialMeta)) { + requiredAllocation = 1; + } else if ("full".equals(initialMeta) || "all".equals(initialMeta)) { + requiredAllocation = nodesIds.size(); + } else if ("full-1".equals(initialMeta) || "all-1".equals(initialMeta)) { + if (nodesIds.size() > 1) { + requiredAllocation = nodesIds.size() - 1; + } + } else { + requiredAllocation = Integer.parseInt(initialMeta); + } + } catch (Exception e) { + logger.warn("failed to derived initial_meta from value {}", initialMeta); + } + if (nodesState.failures().length > 0) { for (FailedNodeException failedNodeException : nodesState.failures()) { logger.warn("failed to fetch state from node", failedNodeException); } } - TransportNodesListGatewayMetaState.NodeLocalGatewayMetaState electedState = null; + MetaData.Builder metaDataBuilder = MetaData.builder(); + TObjectIntHashMap indices = new TObjectIntHashMap(); + MetaData electedGlobalState = null; + int found = 0; for (TransportNodesListGatewayMetaState.NodeLocalGatewayMetaState nodeState : nodesState) { - if (nodeState.state() == null) { + if (nodeState.metaData() == null) { continue; } - if (electedState == null) { - electedState = nodeState; - } else if (nodeState.state().version() > electedState.state().version()) { - electedState = nodeState; + found++; + if (electedGlobalState == null) { + electedGlobalState = nodeState.metaData(); + } else if (nodeState.metaData().version() > electedGlobalState.version()) { + electedGlobalState = nodeState.metaData(); + } + for (IndexMetaData indexMetaData : nodeState.metaData().indices().values()) { + indices.adjustOrPutValue(indexMetaData.index(), 1, 1); } } - if (electedState == null) { - logger.debug("no state elected"); - listener.onSuccess(ClusterState.builder().build()); - } else { - logger.debug("elected state from [{}]", electedState.node()); - ClusterState.Builder builder = ClusterState.builder().version(electedState.state().version()); - builder.metaData(MetaData.builder().metaData(electedState.state().metaData()).version(electedState.state().version())); - listener.onSuccess(builder.build()); + if (found < requiredAllocation) { + listener.onFailure("found [" + found + "] metadata states, required [" + requiredAllocation + "]"); + return; } + // update the global state, and clean the indices, we elect them in the next phase + metaDataBuilder.metaData(electedGlobalState).removeAllIndices(); + for (String index : indices.keySet()) { + IndexMetaData electedIndexMetaData = null; + int indexMetaDataCount = 0; + for (TransportNodesListGatewayMetaState.NodeLocalGatewayMetaState nodeState : nodesState) { + if (nodeState.metaData() == null) { + continue; + } + IndexMetaData indexMetaData = nodeState.metaData().index(index); + if (indexMetaData == null) { + continue; + } + if (electedIndexMetaData == null) { + electedIndexMetaData = indexMetaData; + } else if (indexMetaData.version() > electedIndexMetaData.version()) { + electedIndexMetaData = indexMetaData; + } + indexMetaDataCount++; + } + if (electedIndexMetaData != null) { + if (indexMetaDataCount < requiredAllocation) { + logger.debug("[{}] found [{}], required [{}], not adding", index, indexMetaDataCount, requiredAllocation); + } + metaDataBuilder.put(electedIndexMetaData, false); + } + } + ClusterState.Builder builder = ClusterState.builder(); + builder.metaData(metaDataBuilder); + listener.onSuccess(builder.build()); } @Override @@ -172,187 +204,7 @@ public class LocalGateway extends AbstractLifecycleComponent implements if (event.state().blocks().disableStatePersistence()) { return; } - - // we only write the local metadata if this is a possible master node - if (event.state().nodes().localNode().masterNode() && (event.metaDataChanged() || !metaDataPersistedAtLeastOnce)) { - executor.execute(new LoggingRunnable(logger, new PersistMetaData(event))); - } - + metaState.clusterChanged(event); shardsState.clusterChanged(event); } - - /** - * We do here lazy initialization on not only on start(), since we might be called before start by another node (really will - * happen in term of timing in testing, but still), and we want to return the cluster state when we can. - *

- * It is synchronized since we want to wait for it to be loaded if called concurrently. There should really be a nicer - * solution here, but for now, its good enough. - */ - private synchronized void lazyInitialize() { - if (initialized) { - return; - } - initialized = true; - - if (clusterService.localNode().masterNode()) { - try { - File latest = findLatestMetaStateVersion(); - if (latest != null) { - logger.debug("[find_latest_state]: loading metadata from [{}]", latest.getAbsolutePath()); - this.currentMetaState = readMetaState(Streams.copyToByteArray(new FileInputStream(latest))); - } else { - logger.debug("[find_latest_state]: no metadata state loaded"); - } - } catch (Exception e) { - logger.warn("failed to read local state (metadata)", e); - } - } - } - - private File findLatestMetaStateVersion() throws IOException { - long index = -1; - File latest = null; - for (File dataLocation : nodeEnv.nodeDataLocations()) { - File stateLocation = new File(dataLocation, "_state"); - if (!stateLocation.exists()) { - continue; - } - File[] stateFiles = stateLocation.listFiles(); - if (stateFiles == null) { - continue; - } - for (File stateFile : stateFiles) { - if (logger.isTraceEnabled()) { - logger.trace("[find_latest_state]: processing [" + stateFile.getName() + "]"); - } - String name = stateFile.getName(); - if (!name.startsWith("metadata-")) { - continue; - } - long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1)); - if (fileIndex >= index) { - // try and read the meta data - try { - byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile)); - if (data.length == 0) { - logger.debug("[find_latest_state]: not data for [" + name + "], ignoring..."); - continue; - } - readMetaState(data); - index = fileIndex; - latest = stateFile; - } catch (IOException e) { - logger.warn("[find_latest_state]: failed to read state from [" + name + "], ignoring...", e); - } - } - } - } - return latest; - } - - private LocalGatewayMetaState readMetaState(byte[] data) throws IOException { - XContentParser parser = null; - try { - if (LZF.isCompressed(data)) { - BytesStreamInput siBytes = new BytesStreamInput(data, false); - LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); - parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf); - } else { - parser = XContentFactory.xContent(XContentType.JSON).createParser(data); - } - return LocalGatewayMetaState.Builder.fromXContent(parser); - } finally { - if (parser != null) { - parser.close(); - } - } - } - - class PersistMetaData implements Runnable { - private final ClusterChangedEvent event; - - public PersistMetaData(ClusterChangedEvent event) { - this.event = event; - } - - @Override - public void run() { - LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder(); - if (currentMetaState != null) { - builder.state(currentMetaState); - } - final long version = event.state().metaData().version(); - builder.version(version); - builder.metaData(event.state().metaData()); - LocalGatewayMetaState stateToWrite = builder.build(); - - CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); - StreamOutput streamOutput; - try { - try { - if (compress) { - streamOutput = cachedEntry.cachedLZFBytes(); - } else { - streamOutput = cachedEntry.cachedBytes(); - } - XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, streamOutput); - if (prettyPrint) { - xContentBuilder.prettyPrint(); - } - xContentBuilder.startObject(); - LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); - xContentBuilder.endObject(); - xContentBuilder.close(); - } catch (Exception e) { - logger.warn("failed to serialize local gateway state", e); - return; - } - - boolean serializedAtLeastOnce = false; - for (File dataLocation : nodeEnv.nodeDataLocations()) { - File stateLocation = new File(dataLocation, "_state"); - if (!stateLocation.exists()) { - FileSystemUtils.mkdirs(stateLocation); - } - File stateFile = new File(stateLocation, "metadata-" + version); - FileOutputStream fos = null; - try { - fos = new FileOutputStream(stateFile); - fos.write(cachedEntry.bytes().underlyingBytes(), 0, cachedEntry.bytes().size()); - fos.getChannel().force(true); - serializedAtLeastOnce = true; - } catch (Exception e) { - logger.warn("failed to write local gateway state to {}", e, stateFile); - } finally { - Closeables.closeQuietly(fos); - } - } - if (serializedAtLeastOnce) { - currentMetaState = stateToWrite; - metaDataPersistedAtLeastOnce = true; - - // delete all the other files - for (File dataLocation : nodeEnv.nodeDataLocations()) { - File stateLocation = new File(dataLocation, "_state"); - if (!stateLocation.exists()) { - continue; - } - File[] files = stateLocation.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.startsWith("metadata-") && !name.equals("metadata-" + version); - } - }); - if (files != null) { - for (File file : files) { - file.delete(); - } - } - } - } - } finally { - CachedStreamOutput.pushEntry(cachedEntry); - } - } - } } diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayMetaState.java deleted file mode 100644 index 92a26d15303..00000000000 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayMetaState.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.gateway.local; - -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; - -import java.io.IOException; - -/** - * - */ -public class LocalGatewayMetaState { - - private final long version; - - private final MetaData metaData; - - public LocalGatewayMetaState(long version, MetaData metaData) { - this.version = version; - this.metaData = metaData; - } - - public long version() { - return version; - } - - public MetaData metaData() { - return metaData; - } - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - - private long version; - - private MetaData metaData; - - public Builder state(LocalGatewayMetaState state) { - this.version = state.version(); - this.metaData = state.metaData(); - return this; - } - - public Builder version(long version) { - this.version = version; - return this; - } - - public Builder metaData(MetaData metaData) { - this.metaData = metaData; - return this; - } - - public LocalGatewayMetaState build() { - return new LocalGatewayMetaState(version, metaData); - } - - public static void toXContent(LocalGatewayMetaState state, XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startObject("state"); - - builder.field("version", state.version()); - MetaData.Builder.toXContent(state.metaData(), builder, params); - - builder.endObject(); - } - - public static LocalGatewayMetaState fromXContent(XContentParser parser) throws IOException { - Builder builder = new Builder(); - - String currentFieldName = null; - XContentParser.Token token = parser.nextToken(); - if (token == null) { - // no data... - return builder.build(); - } - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT) { - if ("meta-data".equals(currentFieldName)) { - builder.metaData = MetaData.Builder.fromXContent(parser); - } - } else if (token.isValue()) { - if ("version".equals(currentFieldName)) { - builder.version = parser.longValue(); - } - } - } - - return builder.build(); - } - - public static LocalGatewayMetaState readFrom(StreamInput in) throws IOException { - LocalGatewayMetaState.Builder builder = new Builder(); - builder.version = in.readLong(); - builder.metaData = MetaData.Builder.readFrom(in); - return builder.build(); - } - - public static void writeTo(LocalGatewayMetaState state, StreamOutput out) throws IOException { - out.writeLong(state.version()); - MetaData.Builder.writeTo(state.metaData(), out); - } - } - -} diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java index c2f2b727950..36f8465f36a 100644 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java +++ b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java @@ -24,6 +24,8 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.PreProcessModule; import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.gateway.local.state.meta.LocalGatewayMetaState; +import org.elasticsearch.gateway.local.state.meta.TransportNodesListGatewayMetaState; import org.elasticsearch.gateway.local.state.shards.LocalGatewayShardsState; import org.elasticsearch.gateway.local.state.shards.TransportNodesListGatewayStartedShards; @@ -37,6 +39,7 @@ public class LocalGatewayModule extends AbstractModule implements PreProcessModu bind(Gateway.class).to(LocalGateway.class).asEagerSingleton(); bind(LocalGatewayShardsState.class).asEagerSingleton(); bind(TransportNodesListGatewayMetaState.class).asEagerSingleton(); + bind(LocalGatewayMetaState.class).asEagerSingleton(); bind(TransportNodesListGatewayStartedShards.class).asEagerSingleton(); } diff --git a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java new file mode 100644 index 00000000000..dab24bd85d8 --- /dev/null +++ b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java @@ -0,0 +1,454 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway.local.state.meta; + +import com.google.common.io.Closeables; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.CachedStreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.Index; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Set; + +/** + * + */ +public class LocalGatewayMetaState extends AbstractComponent implements ClusterStateListener { + + private final NodeEnvironment nodeEnv; + + private volatile MetaData currentMetaData; + + @Inject + public LocalGatewayMetaState(Settings settings, NodeEnvironment nodeEnv, TransportNodesListGatewayMetaState nodesListGatewayMetaState) throws Exception { + super(settings); + this.nodeEnv = nodeEnv; + + nodesListGatewayMetaState.init(this); + + try { + pre019Upgrade(); + long start = System.currentTimeMillis(); + loadState(); + logger.debug("took {} to load state", TimeValue.timeValueMillis(System.currentTimeMillis() - start)); + } catch (Exception e) { + logger.error("failed to read local state, exiting...", e); + throw e; + } + } + + public MetaData currentMetaData() { + return currentMetaData; + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.state().blocks().disableStatePersistence()) { + return; + } + + if (!event.state().nodes().localNode().masterNode()) { + return; + } + + if (!event.metaDataChanged()) { + return; + } + + // check if the global state changed? + boolean success = true; + if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, event.state().metaData())) { + try { + writeGlobalState("changed", event.state().metaData(), currentMetaData); + } catch (Exception e) { + success = false; + } + } + + // check and write changes in indices + for (IndexMetaData indexMetaData : event.state().metaData()) { + String writeReason = null; + IndexMetaData currentIndexMetaData = currentMetaData == null ? null : currentMetaData.index(indexMetaData.index()); + if (currentIndexMetaData == null) { + writeReason = "freshly created"; + } else if (currentIndexMetaData.version() != indexMetaData.version()) { + writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]"; + } + + // we update the writeReason only if we really need to write it + if (writeReason == null) { + continue; + } + + try { + writeIndex(writeReason, indexMetaData, currentIndexMetaData); + } catch (Exception e) { + success = false; + } + } + + // delete indices that are no longer there... + if (currentMetaData != null) { + for (IndexMetaData current : currentMetaData) { + if (event.state().metaData().index(current.index()) == null) { + deleteIndex(current.index()); + } + } + } + + if (success) { + currentMetaData = event.state().metaData(); + } + } + + private void deleteIndex(String index) { + logger.trace("[{}] delete index state", index); + File[] indexLocations = nodeEnv.indexLocations(new Index(index)); + for (File indexLocation : indexLocations) { + if (!indexLocation.exists()) { + continue; + } + FileSystemUtils.deleteRecursively(new File(indexLocation, "_state")); + } + } + + private void writeIndex(String reason, IndexMetaData indexMetaData, @Nullable IndexMetaData previousIndexMetaData) throws Exception { + logger.trace("[{}] writing state, reason [{}]", indexMetaData.index(), reason); + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + try { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, cachedEntry.cachedBytes()); + builder.startObject(); + IndexMetaData.Builder.toXContent(indexMetaData, builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + builder.flush(); + + Exception lastFailure = null; + boolean wroteAtLeastOnce = false; + for (File indexLocation : nodeEnv.indexLocations(new Index(indexMetaData.index()))) { + File stateLocation = new File(indexLocation, "_state"); + FileSystemUtils.mkdirs(stateLocation); + File stateFile = new File(stateLocation, "state-" + indexMetaData.version()); + + FileOutputStream fos = null; + try { + fos = new FileOutputStream(stateFile); + fos.write(cachedEntry.bytes().underlyingBytes(), 0, cachedEntry.bytes().size()); + fos.getChannel().force(true); + Closeables.closeQuietly(fos); + wroteAtLeastOnce = true; + } catch (Exception e) { + lastFailure = e; + } finally { + Closeables.closeQuietly(fos); + } + } + + if (!wroteAtLeastOnce) { + logger.warn("[{}]: failed to state", lastFailure, indexMetaData.index()); + throw new IOException("failed to write state for [" + indexMetaData.index() + "]", lastFailure); + } + + // delete the old files + if (previousIndexMetaData != null && previousIndexMetaData.version() != indexMetaData.version()) { + for (File indexLocation : nodeEnv.indexLocations(new Index(indexMetaData.index()))) { + File stateFile = new File(new File(indexLocation, "_state"), "state-" + previousIndexMetaData.version()); + stateFile.delete(); + } + } + } finally { + CachedStreamOutput.pushEntry(cachedEntry); + } + } + + private void writeGlobalState(String reason, MetaData metaData, @Nullable MetaData previousMetaData) throws Exception { + logger.trace("[_global] writing state, reason [{}]", reason); + // create metadata to write with just the global state + MetaData globalMetaData = MetaData.builder().metaData(metaData).removeAllIndices().build(); + + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + try { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, cachedEntry.cachedBytes()); + builder.startObject(); + MetaData.Builder.toXContent(globalMetaData, builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + builder.flush(); + + Exception lastFailure = null; + boolean wroteAtLeastOnce = false; + for (File dataLocation : nodeEnv.nodeDataLocations()) { + File stateLocation = new File(dataLocation, "_state"); + FileSystemUtils.mkdirs(stateLocation); + File stateFile = new File(stateLocation, "global-" + globalMetaData.version()); + + FileOutputStream fos = null; + try { + fos = new FileOutputStream(stateFile); + fos.write(cachedEntry.bytes().underlyingBytes(), 0, cachedEntry.bytes().size()); + fos.getChannel().force(true); + Closeables.closeQuietly(fos); + wroteAtLeastOnce = true; + } catch (Exception e) { + lastFailure = e; + } finally { + Closeables.closeQuietly(fos); + } + } + + if (!wroteAtLeastOnce) { + logger.warn("[_global]: failed to write global state", lastFailure); + throw new IOException("failed to write global state", lastFailure); + } + + // delete the old files + if (previousMetaData != null && previousMetaData.version() != currentMetaData.version()) { + for (File dataLocation : nodeEnv.nodeDataLocations()) { + File stateFile = new File(new File(dataLocation, "_state"), "global-" + previousMetaData.version()); + stateFile.delete(); + } + } + } finally { + CachedStreamOutput.pushEntry(cachedEntry); + } + } + + private void loadState() throws Exception { + MetaData.Builder metaDataBuilder = MetaData.builder(); + MetaData globalMetaData = loadGlobalState(); + if (globalMetaData != null) { + metaDataBuilder.metaData(globalMetaData); + } + + Set indices = nodeEnv.finalAllIndices(); + for (String index : indices) { + IndexMetaData indexMetaData = loadIndex(index); + if (indexMetaData == null) { + logger.debug("[{}] failed to find metadata for existing index location", index); + } else { + metaDataBuilder.put(indexMetaData, false); + } + } + currentMetaData = metaDataBuilder.build(); + } + + private IndexMetaData loadIndex(String index) { + long highestVersion = -1; + IndexMetaData indexMetaData = null; + for (File indexLocation : nodeEnv.indexLocations(new Index(index))) { + File stateDir = new File(indexLocation, "_state"); + if (!stateDir.exists() || !stateDir.isDirectory()) { + continue; + } + // now, iterate over the current versions, and find latest one + File[] stateFiles = stateDir.listFiles(); + if (stateFiles == null) { + continue; + } + for (File stateFile : stateFiles) { + if (!stateFile.getName().startsWith("state-")) { + continue; + } + try { + long version = Long.parseLong(stateFile.getName().substring("state-".length())); + if (version > highestVersion) { + byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile)); + if (data.length == 0) { + logger.debug("[{}]: no data for [" + stateFile.getAbsolutePath() + "], ignoring...", index); + continue; + } + XContentParser parser = null; + try { + parser = XContentHelper.createParser(data, 0, data.length); + parser.nextToken(); // move to START_OBJECT + indexMetaData = IndexMetaData.Builder.fromXContent(parser); + highestVersion = version; + } finally { + if (parser != null) { + parser.close(); + } + } + } + } catch (Exception e) { + logger.debug("[{}]: failed to read [" + stateFile.getAbsolutePath() + "], ignoring...", e, index); + } + } + } + return indexMetaData; + } + + private MetaData loadGlobalState() { + long highestVersion = -1; + MetaData metaData = null; + for (File dataLocation : nodeEnv.nodeDataLocations()) { + File stateLocation = new File(dataLocation, "_state"); + if (!stateLocation.exists()) { + continue; + } + File[] stateFiles = stateLocation.listFiles(); + if (stateFiles == null) { + continue; + } + for (File stateFile : stateFiles) { + String name = stateFile.getName(); + if (!name.startsWith("global-")) { + continue; + } + try { + long version = Long.parseLong(stateFile.getName().substring("global-".length())); + if (version > highestVersion) { + byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile)); + if (data.length == 0) { + logger.debug("[_global] no data for [" + stateFile.getAbsolutePath() + "], ignoring..."); + continue; + } + + XContentParser parser = null; + try { + parser = XContentHelper.createParser(data, 0, data.length); + metaData = MetaData.Builder.fromXContent(parser); + highestVersion = version; + } finally { + if (parser != null) { + parser.close(); + } + } + } + } catch (Exception e) { + logger.debug(""); + } + } + } + + return metaData; + } + + private void pre019Upgrade() throws Exception { + long index = -1; + File metaDataFile = null; + MetaData metaData = null; + long version = -1; + for (File dataLocation : nodeEnv.nodeDataLocations()) { + File stateLocation = new File(dataLocation, "_state"); + if (!stateLocation.exists()) { + continue; + } + File[] stateFiles = stateLocation.listFiles(); + if (stateFiles == null) { + continue; + } + for (File stateFile : stateFiles) { + if (logger.isTraceEnabled()) { + logger.trace("[upgrade]: processing [" + stateFile.getName() + "]"); + } + String name = stateFile.getName(); + if (!name.startsWith("metadata-")) { + continue; + } + long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1)); + if (fileIndex >= index) { + // try and read the meta data + try { + byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile)); + if (data.length == 0) { + continue; + } + XContentParser parser = XContentHelper.createParser(data, 0, data.length); + try { + String currentFieldName = null; + XContentParser.Token token = parser.nextToken(); + if (token != null) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if ("meta-data".equals(currentFieldName)) { + metaData = MetaData.Builder.fromXContent(parser); + } + } else if (token.isValue()) { + if ("version".equals(currentFieldName)) { + version = parser.longValue(); + } + } + } + } + } finally { + parser.close(); + } + index = fileIndex; + metaDataFile = stateFile; + } catch (IOException e) { + logger.warn("failed to read pre 0.19 state from [" + name + "], ignoring...", e); + } + } + } + } + if (metaData == null) { + return; + } + + logger.info("found old metadata state, loading metadata from [{}] and converting to new metadata location and strucutre...", metaDataFile.getAbsolutePath()); + + writeGlobalState("upgrade", MetaData.builder().metaData(metaData).version(version).build(), null); + for (IndexMetaData indexMetaData : metaData) { + writeIndex("upgrade", IndexMetaData.newIndexMetaDataBuilder(indexMetaData).version(version).build(), null); + } + + // rename shards state to backup state + File backupFile = new File(metaDataFile.getParentFile(), "backup-" + metaDataFile.getName()); + if (!metaDataFile.renameTo(backupFile)) { + throw new IOException("failed to rename old state to backup state [" + metaDataFile.getAbsolutePath() + "]"); + } + + // delete all other shards state files + for (File dataLocation : nodeEnv.nodeDataLocations()) { + File stateLocation = new File(dataLocation, "_state"); + if (!stateLocation.exists()) { + continue; + } + File[] stateFiles = stateLocation.listFiles(); + if (stateFiles == null) { + continue; + } + for (File stateFile : stateFiles) { + String name = stateFile.getName(); + if (!name.startsWith("metadata-")) { + continue; + } + stateFile.delete(); + } + } + + logger.info("conversion to new metadata location and format done, backup create at [{}]", backupFile.getAbsolutePath()); + } +} diff --git a/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java similarity index 91% rename from src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayMetaState.java rename to src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java index 13d57974de8..e798b03bff5 100644 --- a/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.gateway.local; +package org.elasticsearch.gateway.local.state.meta; import com.google.common.collect.Lists; import org.elasticsearch.ElasticSearchException; @@ -26,6 +26,7 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.nodes.*; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; @@ -46,15 +47,15 @@ import java.util.concurrent.atomic.AtomicReferenceArray; */ public class TransportNodesListGatewayMetaState extends TransportNodesOperationAction { - private LocalGateway gateway; + private LocalGatewayMetaState metaState; @Inject public TransportNodesListGatewayMetaState(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) { super(settings, clusterName, threadPool, clusterService, transportService); } - TransportNodesListGatewayMetaState initGateway(LocalGateway gateway) { - this.gateway = gateway; + TransportNodesListGatewayMetaState init(LocalGatewayMetaState metaState) { + this.metaState = metaState; return this; } @@ -115,7 +116,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA @Override protected NodeLocalGatewayMetaState nodeOperation(NodeRequest request) throws ElasticSearchException { - return new NodeLocalGatewayMetaState(clusterService.localNode(), gateway.currentMetaState()); + return new NodeLocalGatewayMetaState(clusterService.localNode(), metaState.currentMetaData()); } @Override @@ -208,36 +209,36 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA public static class NodeLocalGatewayMetaState extends NodeOperationResponse { - private LocalGatewayMetaState state; + private MetaData metaData; NodeLocalGatewayMetaState() { } - public NodeLocalGatewayMetaState(DiscoveryNode node, LocalGatewayMetaState state) { + public NodeLocalGatewayMetaState(DiscoveryNode node, MetaData metaData) { super(node); - this.state = state; + this.metaData = metaData; } - public LocalGatewayMetaState state() { - return state; + public MetaData metaData() { + return metaData; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); if (in.readBoolean()) { - state = LocalGatewayMetaState.Builder.readFrom(in); + metaData = MetaData.Builder.readFrom(in); } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - if (state == null) { + if (metaData == null) { out.writeBoolean(false); } else { out.writeBoolean(true); - LocalGatewayMetaState.Builder.writeTo(state, out); + MetaData.Builder.writeTo(metaData, out); } } } diff --git a/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java b/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java index 8b7cdee383b..194548faac5 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java @@ -22,7 +22,6 @@ package org.elasticsearch.gateway.local.state.shards; import com.google.common.collect.Maps; import com.google.common.io.Closeables; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.Nullable; @@ -37,10 +36,7 @@ import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.LZFStreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.*; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.shard.ShardId; @@ -57,21 +53,27 @@ import java.util.Set; public class LocalGatewayShardsState extends AbstractComponent implements ClusterStateListener { private final NodeEnvironment nodeEnv; - private final ClusterService clusterService; - private volatile boolean initialized = false; private volatile Map currentState = Maps.newHashMap(); @Inject - public LocalGatewayShardsState(Settings settings, NodeEnvironment nodeEnv, ClusterService clusterService, TransportNodesListGatewayStartedShards listGatewayStartedShards) { + public LocalGatewayShardsState(Settings settings, NodeEnvironment nodeEnv, TransportNodesListGatewayStartedShards listGatewayStartedShards) throws Exception { super(settings); this.nodeEnv = nodeEnv; - this.clusterService = clusterService; listGatewayStartedShards.initGateway(this); + + try { + pre019Upgrade(); + long start = System.currentTimeMillis(); + loadStartedShards(); + logger.debug("took {} to load started shards state", TimeValue.timeValueMillis(System.currentTimeMillis() - start)); + } catch (Exception e) { + logger.error("failed to read local state (started shards), exiting...", e); + throw e; + } } public Map currentStartedShards() { - lazyInitialize(); return this.currentState; } @@ -160,30 +162,6 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste this.currentState = newState; } - private synchronized void lazyInitialize() { - if (initialized) { - return; - } - initialized = true; - - // we only persist shards state for data nodes - if (!clusterService.localNode().dataNode()) { - return; - } - - try { - pre019Upgrade(); - long start = System.currentTimeMillis(); - loadStartedShards(); - logger.debug("took {} to load started shards state", TimeValue.timeValueMillis(System.currentTimeMillis() - start)); - } catch (Exception e) { - logger.error("failed to read local state (started shards), exiting...", e); - // ugly, but, if we fail to read it, bail completely so we don't have any node corrupting the cluster - System.exit(1); - } - } - - private void loadStartedShards() throws Exception { Set shardIds = nodeEnv.findAllShardIds(); long highestVersion = -1; @@ -244,13 +222,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste private long readShardState(byte[] data) throws Exception { XContentParser parser = null; try { - if (LZF.isCompressed(data)) { - BytesStreamInput siBytes = new BytesStreamInput(data, false); - LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); - parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf); - } else { - parser = XContentFactory.xContent(XContentType.JSON).createParser(data); - } + parser = XContentHelper.createParser(data, 0, data.length); XContentParser.Token token = parser.nextToken(); if (token == null) { return -1; @@ -313,7 +285,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste } // delete the old files - if (previousStateInfo != null) { + if (previousStateInfo != null && previousStateInfo.version != shardStateInfo.version) { for (File shardLocation : nodeEnv.shardLocations(shardId)) { File stateFile = new File(new File(shardLocation, "_state"), "state-" + previousStateInfo.version); stateFile.delete(); @@ -361,13 +333,13 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste try { byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile)); if (data.length == 0) { - logger.debug("[find_latest_state]: not data for [" + name + "], ignoring..."); + logger.debug("[upgrade]: not data for [" + name + "], ignoring..."); } pre09ReadState(data); index = fileIndex; latest = stateFile; } catch (IOException e) { - logger.warn("[find_latest_state]: failed to read state from [" + name + "], ignoring...", e); + logger.warn("[upgrade]: failed to read state from [" + name + "], ignoring...", e); } } } @@ -400,9 +372,6 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste continue; } for (File stateFile : stateFiles) { - if (logger.isTraceEnabled()) { - logger.trace("[find_latest_state]: processing [" + stateFile.getName() + "]"); - } String name = stateFile.getName(); if (!name.startsWith("shards-")) { continue; diff --git a/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java b/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java index e2f121d946e..3b3d92cbfe0 100644 --- a/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java +++ b/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java @@ -20,6 +20,7 @@ package org.elasticsearch.gateway.shared; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -53,16 +54,20 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent starting the previous master node again..."); startNode(masterNodeName, settings); - clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForYellowStatus().setWaitForNodes("2").execute().actionGet(); assertThat(clusterHealthResponse.timedOut(), equalTo(false)); state = client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet().state();