From 534f487de3edd7b6a4ca6d6aed7bdbc5b0ae2b7a Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Wed, 18 Jan 2012 01:08:35 +0200 Subject: [PATCH] Local Gateway: Move shard state to be stored under each shard, and not globally under _state, closes #1618. --- .../elasticsearch/env/NodeEnvironment.java | 32 ++ .../gateway/local/LocalGateway.java | 258 +--------- .../gateway/local/LocalGatewayAllocator.java | 1 + .../gateway/local/LocalGatewayModule.java | 3 + .../local/LocalGatewayStartedShards.java | 185 ------- .../state/shards/LocalGatewayShardsState.java | 472 ++++++++++++++++++ .../local/state/shards/ShardStateInfo.java | 31 ++ ...ransportNodesListGatewayStartedShards.java | 17 +- .../SimpleRecoveryLocalGatewayTests.java | 4 +- 9 files changed, 568 insertions(+), 435 deletions(-) delete mode 100644 src/main/java/org/elasticsearch/gateway/local/LocalGatewayStartedShards.java create mode 100644 src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java create mode 100644 src/main/java/org/elasticsearch/gateway/local/state/shards/ShardStateInfo.java rename src/main/java/org/elasticsearch/gateway/local/{ => state/shards}/TransportNodesListGatewayStartedShards.java (94%) diff --git a/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/src/main/java/org/elasticsearch/env/NodeEnvironment.java index c28043ed2df..fa71a28a665 100644 --- a/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -19,6 +19,7 @@ package org.elasticsearch.env; +import com.google.common.collect.Sets; import org.apache.lucene.store.Lock; import org.apache.lucene.store.NativeFSLockFactory; import org.elasticsearch.ElasticSearchIllegalStateException; @@ -34,6 +35,7 @@ import org.elasticsearch.index.shard.ShardId; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Set; /** * @@ -175,6 +177,36 @@ public class NodeEnvironment extends AbstractComponent { return shardLocations; } + public Set findAllShardIds() throws Exception { + if (nodeFiles == null || locks == null) { + throw new ElasticSearchIllegalStateException("node is not configured to store local location"); + } + Set shardIds = Sets.newHashSet(); + for (File indicesLocation : nodeIndicesLocations) { + File[] indicesList = indicesLocation.listFiles(); + if (indicesList == null) { + continue; + } + for (File indexLocation : indicesList) { + if (!indexLocation.isDirectory()) { + continue; + } + String indexName = indexLocation.getName(); + File[] shardsList = indexLocation.listFiles(); + if (shardsList == null) { + continue; + } + for (File shardLocation : shardsList) { + if (!shardLocation.isDirectory()) { + continue; + } + shardIds.add(new ShardId(indexName, Integer.parseInt(shardLocation.getName()))); + } + } + } + return shardIds; + } + public void close() { if (locks != null) { for (Lock lock : locks) { diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index 1fae23024c0..79d8eb38034 100644 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -28,7 +28,6 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.lzf.LZF; import org.elasticsearch.common.inject.Inject; @@ -42,8 +41,8 @@ import org.elasticsearch.common.xcontent.*; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.GatewayException; +import org.elasticsearch.gateway.local.state.shards.LocalGatewayShardsState; import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule; -import org.elasticsearch.index.shard.ShardId; import java.io.*; import java.util.Set; @@ -58,24 +57,19 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF */ public class LocalGateway extends AbstractLifecycleComponent implements Gateway, ClusterStateListener { - private boolean requiresStatePersistence; - private final ClusterService clusterService; private final NodeEnvironment nodeEnv; + private final LocalGatewayShardsState shardsState; + private final TransportNodesListGatewayMetaState listGatewayMetaState; - private final TransportNodesListGatewayStartedShards listGatewayStartedShards; - - private final boolean compress; private final boolean prettyPrint; private volatile LocalGatewayMetaState currentMetaState; - private volatile LocalGatewayStartedShards currentStartedShards; - private volatile ExecutorService executor; private volatile boolean initialized = false; @@ -83,13 +77,14 @@ public class LocalGateway extends AbstractLifecycleComponent implements private volatile boolean metaDataPersistedAtLeastOnce = false; @Inject - public LocalGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, - TransportNodesListGatewayMetaState listGatewayMetaState, TransportNodesListGatewayStartedShards listGatewayStartedShards) { + public LocalGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, LocalGatewayShardsState shardsState, + TransportNodesListGatewayMetaState listGatewayMetaState) { super(settings); this.clusterService = clusterService; this.nodeEnv = nodeEnv; this.listGatewayMetaState = listGatewayMetaState.initGateway(this); - this.listGatewayStartedShards = listGatewayStartedShards.initGateway(this); + + this.shardsState = shardsState; this.compress = componentSettings.getAsBoolean("compress", true); this.prettyPrint = componentSettings.getAsBoolean("pretty", false); @@ -105,16 +100,11 @@ public class LocalGateway extends AbstractLifecycleComponent implements return this.currentMetaState; } - public LocalGatewayStartedShards currentStartedShards() { - lazyInitialize(); - return this.currentStartedShards; - } - @Override protected void doStart() throws ElasticSearchException { this.executor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway")); lazyInitialize(); - clusterService.add(this); + clusterService.addLast(this); } @Override @@ -178,10 +168,6 @@ public class LocalGateway extends AbstractLifecycleComponent implements @Override public void clusterChanged(final ClusterChangedEvent event) { - if (!requiresStatePersistence) { - return; - } - // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency if (event.state().blocks().disableStatePersistence()) { return; @@ -192,50 +178,7 @@ public class LocalGateway extends AbstractLifecycleComponent implements executor.execute(new LoggingRunnable(logger, new PersistMetaData(event))); } - if (event.state().nodes().localNode().dataNode() && event.routingTableChanged()) { - LocalGatewayStartedShards.Builder builder = LocalGatewayStartedShards.builder(); - if (currentStartedShards != null) { - builder.state(currentStartedShards); - } - builder.version(event.state().version()); - - boolean changed = false; - - // remove from the current state all the shards that are primary and started somewhere, we won't need them anymore - // and if they are still here, we will add them in the next phase - - // Also note, this works well when closing an index, since a closed index will have no routing shards entries - // so they won't get removed (we want to keep the fact that those shards are allocated on this node if needed) - for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) { - for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { - if (indexShardRoutingTable.countWithState(ShardRoutingState.STARTED) == indexShardRoutingTable.size()) { - changed |= builder.remove(indexShardRoutingTable.shardId()); - } - } - } - // remove deleted indices from the started shards - for (ShardId shardId : builder.build().shards().keySet()) { - if (!event.state().metaData().hasIndex(shardId.index().name())) { - changed |= builder.remove(shardId); - } - } - // now, add all the ones that are active and on this node - RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId()); - if (routingNode != null) { - // out node is not in play yet... - for (MutableShardRouting shardRouting : routingNode) { - if (shardRouting.active()) { - changed |= builder.put(shardRouting.shardId(), shardRouting.version()); - } - } - } - - // only write if something changed... - if (changed) { - final LocalGatewayStartedShards stateToWrite = builder.build(); - executor.execute(new LoggingRunnable(logger, new PersistShards(event, stateToWrite))); - } - } + shardsState.clusterChanged(event); } /** @@ -251,84 +194,21 @@ public class LocalGateway extends AbstractLifecycleComponent implements } initialized = true; - // if this is not a possible master node or data node, bail, we won't save anything here... - if (!clusterService.localNode().masterNode() && !clusterService.localNode().dataNode()) { - requiresStatePersistence = false; - } else { - // create the location where the state will be stored - // TODO: we might want to persist states on all data locations - requiresStatePersistence = true; - - if (clusterService.localNode().masterNode()) { - try { - File latest = findLatestMetaStateVersion(); - if (latest != null) { - logger.debug("[find_latest_state]: loading metadata from [{}]", latest.getAbsolutePath()); - this.currentMetaState = readMetaState(Streams.copyToByteArray(new FileInputStream(latest))); - } else { - logger.debug("[find_latest_state]: no metadata state loaded"); - } - } catch (Exception e) { - logger.warn("failed to read local state (metadata)", e); - } - } - - if (clusterService.localNode().dataNode()) { - try { - File latest = findLatestStartedShardsVersion(); - if (latest != null) { - logger.debug("[find_latest_state]: loading started shards from [{}]", latest.getAbsolutePath()); - this.currentStartedShards = readStartedShards(Streams.copyToByteArray(new FileInputStream(latest))); - } else { - logger.debug("[find_latest_state]: no started shards loaded"); - } - } catch (Exception e) { - logger.warn("failed to read local state (started shards)", e); + if (clusterService.localNode().masterNode()) { + try { + File latest = findLatestMetaStateVersion(); + if (latest != null) { + logger.debug("[find_latest_state]: loading metadata from [{}]", latest.getAbsolutePath()); + this.currentMetaState = readMetaState(Streams.copyToByteArray(new FileInputStream(latest))); + } else { + logger.debug("[find_latest_state]: no metadata state loaded"); } + } catch (Exception e) { + logger.warn("failed to read local state (metadata)", e); } } } - private File findLatestStartedShardsVersion() throws IOException { - long index = -1; - File latest = null; - for (File dataLocation : nodeEnv.nodeDataLocations()) { - File stateLocation = new File(dataLocation, "_state"); - if (!stateLocation.exists()) { - continue; - } - File[] stateFiles = stateLocation.listFiles(); - if (stateFiles == null) { - continue; - } - for (File stateFile : stateFiles) { - if (logger.isTraceEnabled()) { - logger.trace("[find_latest_state]: processing [" + stateFile.getName() + "]"); - } - String name = stateFile.getName(); - if (!name.startsWith("shards-")) { - continue; - } - long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1)); - if (fileIndex >= index) { - // try and read the meta data - try { - byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile)); - if (data.length == 0) { - logger.debug("[find_latest_state]: not data for [" + name + "], ignoring..."); - } - readStartedShards(data); - index = fileIndex; - latest = stateFile; - } catch (IOException e) { - logger.warn("[find_latest_state]: failed to read state from [" + name + "], ignoring...", e); - } - } - } - } - return latest; - } - private File findLatestMetaStateVersion() throws IOException { long index = -1; File latest = null; @@ -388,24 +268,6 @@ public class LocalGateway extends AbstractLifecycleComponent implements } } - private LocalGatewayStartedShards readStartedShards(byte[] data) throws IOException { - XContentParser parser = null; - try { - if (LZF.isCompressed(data)) { - BytesStreamInput siBytes = new BytesStreamInput(data, false); - LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); - parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf); - } else { - parser = XContentFactory.xContent(XContentType.JSON).createParser(data); - } - return LocalGatewayStartedShards.Builder.fromXContent(parser); - } finally { - if (parser != null) { - parser.close(); - } - } - } - class PersistMetaData implements Runnable { private final ClusterChangedEvent event; @@ -493,86 +355,4 @@ public class LocalGateway extends AbstractLifecycleComponent implements } } } - - class PersistShards implements Runnable { - private final ClusterChangedEvent event; - private final LocalGatewayStartedShards stateToWrite; - - public PersistShards(ClusterChangedEvent event, LocalGatewayStartedShards stateToWrite) { - this.event = event; - this.stateToWrite = stateToWrite; - } - - @Override - public void run() { - - CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); - try { - StreamOutput streamOutput; - try { - if (compress) { - streamOutput = cachedEntry.cachedLZFBytes(); - } else { - streamOutput = cachedEntry.cachedBytes(); - } - XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, streamOutput); - if (prettyPrint) { - xContentBuilder.prettyPrint(); - } - xContentBuilder.startObject(); - LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); - xContentBuilder.endObject(); - xContentBuilder.close(); - } catch (Exception e) { - logger.warn("failed to serialize local gateway shard states", e); - return; - } - - boolean serializedAtLeastOnce = false; - for (File dataLocation : nodeEnv.nodeDataLocations()) { - File stateLocation = new File(dataLocation, "_state"); - if (!stateLocation.exists()) { - FileSystemUtils.mkdirs(stateLocation); - } - File stateFile = new File(stateLocation, "shards-" + event.state().version()); - FileOutputStream fos = null; - try { - fos = new FileOutputStream(stateFile); - fos.write(cachedEntry.bytes().underlyingBytes(), 0, cachedEntry.bytes().size()); - fos.getChannel().force(true); - serializedAtLeastOnce = true; - } catch (Exception e) { - logger.warn("failed to write local gateway shards state to {}", e, stateFile); - } finally { - Closeables.closeQuietly(fos); - } - } - - if (serializedAtLeastOnce) { - currentStartedShards = stateToWrite; - - // delete all the other files - for (File dataLocation : nodeEnv.nodeDataLocations()) { - File stateLocation = new File(dataLocation, "_state"); - if (!stateLocation.exists()) { - continue; - } - File[] files = stateLocation.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.startsWith("shards-") && !name.equals("shards-" + event.state().version()); - } - }); - if (files != null) { - for (File file : files) { - file.delete(); - } - } - } - } - } finally { - CachedStreamOutput.pushEntry(cachedEntry); - } - } - } } diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java index ddb2fca430a..ac9b4171e09 100644 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java +++ b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.gateway.local.state.shards.TransportNodesListGatewayStartedShards; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java index f3ae64bc2f9..c2f2b727950 100644 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java +++ b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java @@ -24,6 +24,8 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.PreProcessModule; import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.gateway.local.state.shards.LocalGatewayShardsState; +import org.elasticsearch.gateway.local.state.shards.TransportNodesListGatewayStartedShards; /** * @@ -33,6 +35,7 @@ public class LocalGatewayModule extends AbstractModule implements PreProcessModu @Override protected void configure() { bind(Gateway.class).to(LocalGateway.class).asEagerSingleton(); + bind(LocalGatewayShardsState.class).asEagerSingleton(); bind(TransportNodesListGatewayMetaState.class).asEagerSingleton(); bind(TransportNodesListGatewayStartedShards.class).asEagerSingleton(); } diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayStartedShards.java b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayStartedShards.java deleted file mode 100644 index 33afc114ad3..00000000000 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayStartedShards.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.gateway.local; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.shard.ShardId; - -import java.io.IOException; -import java.util.Map; - -/** - * - */ -public class LocalGatewayStartedShards { - - private final long version; - - private final ImmutableMap shards; - - public LocalGatewayStartedShards(long version, Map shards) { - this.version = version; - this.shards = ImmutableMap.copyOf(shards); - } - - public long version() { - return version; - } - - public ImmutableMap shards() { - return shards; - } - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - - private long version; - - private Map shards = Maps.newHashMap(); - - public Builder state(LocalGatewayStartedShards state) { - this.version = state.version(); - this.shards.putAll(state.shards); - return this; - } - - public Builder version(long version) { - this.version = version; - return this; - } - - /** - * Returns true if something really changed. - */ - public boolean remove(ShardId shardId) { - return shards.remove(shardId) != null; - } - - /** - * Returns true if something really changed. - */ - public boolean put(ShardId shardId, long version) { - Long lVersion = shards.get(shardId); - if (lVersion != null && lVersion == version) { - return false; - } - this.shards.put(shardId, version); - return true; - } - - public LocalGatewayStartedShards build() { - return new LocalGatewayStartedShards(version, shards); - } - - public static void toXContent(LocalGatewayStartedShards state, XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startObject("state"); - - builder.field("version", state.version()); - - builder.startArray("shards"); - for (Map.Entry entry : state.shards.entrySet()) { - builder.startObject(); - builder.field("index", entry.getKey().index().name()); - builder.field("id", entry.getKey().id()); - builder.field("version", entry.getValue()); - builder.endObject(); - } - builder.endArray(); - - builder.endObject(); - } - - public static LocalGatewayStartedShards fromXContent(XContentParser parser) throws IOException { - Builder builder = new Builder(); - - String currentFieldName = null; - XContentParser.Token token = parser.nextToken(); - if (token == null) { - // no data... - return builder.build(); - } - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_ARRAY) { - if ("shards".equals(currentFieldName)) { - while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { - if (token == XContentParser.Token.START_OBJECT) { - String shardIndex = null; - int shardId = -1; - long version = -1; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token.isValue()) { - if ("index".equals(currentFieldName)) { - shardIndex = parser.text(); - } else if ("id".equals(currentFieldName)) { - shardId = parser.intValue(); - } else if ("version".equals(currentFieldName)) { - version = parser.longValue(); - } - } - } - builder.shards.put(new ShardId(shardIndex, shardId), version); - } - } - } - } else if (token.isValue()) { - if ("version".equals(currentFieldName)) { - builder.version = parser.longValue(); - } - } - } - - return builder.build(); - } - - public static LocalGatewayStartedShards readFrom(StreamInput in) throws IOException { - LocalGatewayStartedShards.Builder builder = new Builder(); - builder.version = in.readLong(); - int size = in.readVInt(); - for (int i = 0; i < size; i++) { - builder.shards.put(ShardId.readShardId(in), in.readLong()); - } - return builder.build(); - } - - public static void writeTo(LocalGatewayStartedShards state, StreamOutput out) throws IOException { - out.writeLong(state.version()); - - out.writeVInt(state.shards.size()); - for (Map.Entry entry : state.shards.entrySet()) { - entry.getKey().writeTo(out); - out.writeLong(entry.getValue()); - } - } - } - -} diff --git a/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java b/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java new file mode 100644 index 00000000000..8b7cdee383b --- /dev/null +++ b/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java @@ -0,0 +1,472 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway.local.state.shards; + +import com.google.common.collect.Maps; +import com.google.common.io.Closeables; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.compress.lzf.LZF; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStreamInput; +import org.elasticsearch.common.io.stream.CachedStreamInput; +import org.elasticsearch.common.io.stream.CachedStreamOutput; +import org.elasticsearch.common.io.stream.LZFStreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.shard.ShardId; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + */ +public class LocalGatewayShardsState extends AbstractComponent implements ClusterStateListener { + + private final NodeEnvironment nodeEnv; + private final ClusterService clusterService; + + private volatile boolean initialized = false; + private volatile Map currentState = Maps.newHashMap(); + + @Inject + public LocalGatewayShardsState(Settings settings, NodeEnvironment nodeEnv, ClusterService clusterService, TransportNodesListGatewayStartedShards listGatewayStartedShards) { + super(settings); + this.nodeEnv = nodeEnv; + this.clusterService = clusterService; + listGatewayStartedShards.initGateway(this); + } + + public Map currentStartedShards() { + lazyInitialize(); + return this.currentState; + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.state().blocks().disableStatePersistence()) { + return; + } + + if (!event.state().nodes().localNode().dataNode()) { + return; + } + + if (!event.routingTableChanged()) { + return; + } + + Map newState = Maps.newHashMap(); + newState.putAll(this.currentState); + + + // remove from the current state all the shards that are completely started somewhere, we won't need them anymore + // and if they are still here, we will add them in the next phase + // Also note, this works well when closing an index, since a closed index will have no routing shards entries + // so they won't get removed (we want to keep the fact that those shards are allocated on this node if needed) + for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + if (indexShardRoutingTable.countWithState(ShardRoutingState.STARTED) == indexShardRoutingTable.size()) { + newState.remove(indexShardRoutingTable.shardId()); + } + } + } + // remove deleted indices from the started shards + for (ShardId shardId : currentState.keySet()) { + if (!event.state().metaData().hasIndex(shardId.index().name())) { + newState.remove(shardId); + } + } + // now, add all the ones that are active and on this node + RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId()); + if (routingNode != null) { + // our node is not in play yet... + for (MutableShardRouting shardRouting : routingNode) { + if (shardRouting.active()) { + newState.put(shardRouting.shardId(), new ShardStateInfo(shardRouting.version())); + } + } + } + + // go over the write started shards if needed + for (Iterator> it = newState.entrySet().iterator(); it.hasNext(); ) { + Map.Entry entry = it.next(); + ShardId shardId = entry.getKey(); + ShardStateInfo shardStateInfo = entry.getValue(); + + String writeReason = null; + ShardStateInfo currentShardStateInfo = currentState.get(shardId); + if (currentShardStateInfo == null) { + writeReason = "freshly started, version [" + shardStateInfo.version + "]"; + } else if (currentShardStateInfo.version != shardStateInfo.version) { + writeReason = "version changed from [" + currentShardStateInfo.version + "] to [" + shardStateInfo.version + "]"; + } + + // we update the write reason if we really need to write a new one... + if (writeReason == null) { + continue; + } + + try { + writeShardState(writeReason, shardId, shardStateInfo, currentShardStateInfo); + } catch (Exception e) { + // we failed to write the shard state, remove it from our builder, we will try and write + // it next time... + it.remove(); + } + } + + // now, go over the current ones and delete ones that are not in the new one + for (Map.Entry entry : currentState.entrySet()) { + ShardId shardId = entry.getKey(); + if (!newState.containsKey(shardId)) { + deleteShardState(shardId); + } + } + + this.currentState = newState; + } + + private synchronized void lazyInitialize() { + if (initialized) { + return; + } + initialized = true; + + // we only persist shards state for data nodes + if (!clusterService.localNode().dataNode()) { + return; + } + + try { + pre019Upgrade(); + long start = System.currentTimeMillis(); + loadStartedShards(); + logger.debug("took {} to load started shards state", TimeValue.timeValueMillis(System.currentTimeMillis() - start)); + } catch (Exception e) { + logger.error("failed to read local state (started shards), exiting...", e); + // ugly, but, if we fail to read it, bail completely so we don't have any node corrupting the cluster + System.exit(1); + } + } + + + private void loadStartedShards() throws Exception { + Set shardIds = nodeEnv.findAllShardIds(); + long highestVersion = -1; + Map shardsState = Maps.newHashMap(); + for (ShardId shardId : shardIds) { + long highestShardVersion = -1; + File highestShardFile = null; + for (File shardLocation : nodeEnv.shardLocations(shardId)) { + File shardStateDir = new File(shardLocation, "_state"); + if (!shardStateDir.exists() || !shardStateDir.isDirectory()) { + continue; + } + // now, iterate over the current versions, and find latest one + File[] stateFiles = shardStateDir.listFiles(); + if (stateFiles == null) { + continue; + } + for (File stateFile : stateFiles) { + if (!stateFile.getName().startsWith("state-")) { + continue; + } + try { + long version = Long.parseLong(stateFile.getName().substring("state-".length())); + if (version > highestShardVersion) { + byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile)); + if (data.length == 0) { + logger.debug("[{}][{}]: not data for [" + stateFile.getAbsolutePath() + "], ignoring...", shardId.index().name(), shardId.id()); + continue; + } + long readStateVersion = readShardState(data); + assert readStateVersion == version; + highestShardVersion = version; + highestShardFile = stateFile; + } + } catch (Exception e) { + logger.debug("[{}][{}]: failed to read [" + stateFile.getAbsolutePath() + "], ignoring...", e, shardId.index().name(), shardId.id()); + } + } + } + // did we find a state file? + if (highestShardFile == null) { + continue; + } + + shardsState.put(shardId, new ShardStateInfo(highestShardVersion)); + + // update the global version + if (highestShardVersion > highestVersion) { + highestVersion = highestShardVersion; + } + } + // update the current started shards only if there is data there... + if (highestVersion != -1) { + currentState = shardsState; + } + } + + private long readShardState(byte[] data) throws Exception { + XContentParser parser = null; + try { + if (LZF.isCompressed(data)) { + BytesStreamInput siBytes = new BytesStreamInput(data, false); + LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); + parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf); + } else { + parser = XContentFactory.xContent(XContentType.JSON).createParser(data); + } + XContentParser.Token token = parser.nextToken(); + if (token == null) { + return -1; + } + long version = -1; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if ("version".equals(currentFieldName)) { + version = parser.longValue(); + } + } + } + return version; + } finally { + if (parser != null) { + parser.close(); + } + } + } + + private void writeShardState(String reason, ShardId shardId, ShardStateInfo shardStateInfo, @Nullable ShardStateInfo previousStateInfo) throws Exception { + logger.trace("[{}][{}] writing shard state, reason [{}]", shardId.index().name(), shardId.id(), reason); + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + try { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, cachedEntry.cachedBytes()); + builder.prettyPrint(); + builder.startObject(); + builder.field("version", shardStateInfo.version); + builder.endObject(); + builder.flush(); + + Exception lastFailure = null; + boolean wroteAtLeastOnce = false; + for (File shardLocation : nodeEnv.shardLocations(shardId)) { + File shardStateDir = new File(shardLocation, "_state"); + FileSystemUtils.mkdirs(shardStateDir); + File stateFile = new File(shardStateDir, "state-" + shardStateInfo.version); + + + FileOutputStream fos = null; + try { + fos = new FileOutputStream(stateFile); + fos.write(cachedEntry.bytes().underlyingBytes(), 0, cachedEntry.bytes().size()); + fos.getChannel().force(true); + Closeables.closeQuietly(fos); + wroteAtLeastOnce = true; + } catch (Exception e) { + lastFailure = e; + } finally { + Closeables.closeQuietly(fos); + } + } + + if (!wroteAtLeastOnce) { + logger.warn("[{}][{}]: failed to write shard state", shardId.index().name(), shardId.id(), lastFailure); + throw new IOException("failed to write shard state for " + shardId, lastFailure); + } + + // delete the old files + if (previousStateInfo != null) { + for (File shardLocation : nodeEnv.shardLocations(shardId)) { + File stateFile = new File(new File(shardLocation, "_state"), "state-" + previousStateInfo.version); + stateFile.delete(); + } + } + } finally { + CachedStreamOutput.pushEntry(cachedEntry); + } + } + + private void deleteShardState(ShardId shardId) { + logger.trace("[{}][{}] delete shard state", shardId.index().name(), shardId.id()); + File[] shardLocations = nodeEnv.shardLocations(shardId); + for (File shardLocation : shardLocations) { + if (!shardLocation.exists()) { + continue; + } + FileSystemUtils.deleteRecursively(new File(shardLocation, "_state")); + } + } + + private void pre019Upgrade() throws Exception { + long index = -1; + File latest = null; + for (File dataLocation : nodeEnv.nodeDataLocations()) { + File stateLocation = new File(dataLocation, "_state"); + if (!stateLocation.exists()) { + continue; + } + File[] stateFiles = stateLocation.listFiles(); + if (stateFiles == null) { + continue; + } + for (File stateFile : stateFiles) { + if (logger.isTraceEnabled()) { + logger.trace("[find_latest_state]: processing [" + stateFile.getName() + "]"); + } + String name = stateFile.getName(); + if (!name.startsWith("shards-")) { + continue; + } + long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1)); + if (fileIndex >= index) { + // try and read the meta data + try { + byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile)); + if (data.length == 0) { + logger.debug("[find_latest_state]: not data for [" + name + "], ignoring..."); + } + pre09ReadState(data); + index = fileIndex; + latest = stateFile; + } catch (IOException e) { + logger.warn("[find_latest_state]: failed to read state from [" + name + "], ignoring...", e); + } + } + } + } + if (latest == null) { + return; + } + + logger.info("found old shards state, loading started shards from [{}] and converting to new shards state locations...", latest.getAbsolutePath()); + Map shardsState = pre09ReadState(Streams.copyToByteArray(new FileInputStream(latest))); + + for (Map.Entry entry : shardsState.entrySet()) { + writeShardState("upgrade", entry.getKey(), entry.getValue(), null); + } + + // rename shards state to backup state + File backupFile = new File(latest.getParentFile(), "backup-" + latest.getName()); + if (!latest.renameTo(backupFile)) { + throw new IOException("failed to rename old state to backup state [" + latest.getAbsolutePath() + "]"); + } + + // delete all other shards state files + for (File dataLocation : nodeEnv.nodeDataLocations()) { + File stateLocation = new File(dataLocation, "_state"); + if (!stateLocation.exists()) { + continue; + } + File[] stateFiles = stateLocation.listFiles(); + if (stateFiles == null) { + continue; + } + for (File stateFile : stateFiles) { + if (logger.isTraceEnabled()) { + logger.trace("[find_latest_state]: processing [" + stateFile.getName() + "]"); + } + String name = stateFile.getName(); + if (!name.startsWith("shards-")) { + continue; + } + stateFile.delete(); + } + } + + logger.info("conversion to new shards state location and format done, backup create at [{}]", backupFile.getAbsolutePath()); + } + + private Map pre09ReadState(byte[] data) throws IOException { + XContentParser parser = null; + try { + Map shardsState = Maps.newHashMap(); + + if (LZF.isCompressed(data)) { + BytesStreamInput siBytes = new BytesStreamInput(data, false); + LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); + parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf); + } else { + parser = XContentFactory.xContent(XContentType.JSON).createParser(data); + } + + String currentFieldName = null; + XContentParser.Token token = parser.nextToken(); + if (token == null) { + // no data... + return shardsState; + } + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_ARRAY) { + if ("shards".equals(currentFieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token == XContentParser.Token.START_OBJECT) { + String shardIndex = null; + int shardId = -1; + long version = -1; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if ("index".equals(currentFieldName)) { + shardIndex = parser.text(); + } else if ("id".equals(currentFieldName)) { + shardId = parser.intValue(); + } else if ("version".equals(currentFieldName)) { + version = parser.longValue(); + } + } + } + shardsState.put(new ShardId(shardIndex, shardId), new ShardStateInfo(version)); + } + } + } + } + } + return shardsState; + } finally { + if (parser != null) { + parser.close(); + } + } + } +} diff --git a/src/main/java/org/elasticsearch/gateway/local/state/shards/ShardStateInfo.java b/src/main/java/org/elasticsearch/gateway/local/state/shards/ShardStateInfo.java new file mode 100644 index 00000000000..764ba936ad2 --- /dev/null +++ b/src/main/java/org/elasticsearch/gateway/local/state/shards/ShardStateInfo.java @@ -0,0 +1,31 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway.local.state.shards; + +/** + */ +public class ShardStateInfo { + + public final long version; + + public ShardStateInfo(long version) { + this.version = version; + } +} diff --git a/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java b/src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java similarity index 94% rename from src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java rename to src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java index 24e86aba66b..b0cca8da8a6 100644 --- a/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.gateway.local; +package org.elasticsearch.gateway.local.state.shards; import com.google.common.collect.Lists; import org.elasticsearch.ElasticSearchException; @@ -48,15 +48,15 @@ import java.util.concurrent.atomic.AtomicReferenceArray; */ public class TransportNodesListGatewayStartedShards extends TransportNodesOperationAction { - private LocalGateway gateway; + private LocalGatewayShardsState shardsState; @Inject public TransportNodesListGatewayStartedShards(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) { super(settings, clusterName, threadPool, clusterService, transportService); } - TransportNodesListGatewayStartedShards initGateway(LocalGateway gateway) { - this.gateway = gateway; + TransportNodesListGatewayStartedShards initGateway(LocalGatewayShardsState shardsState) { + this.shardsState = shardsState; return this; } @@ -117,12 +117,11 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat @Override protected NodeLocalGatewayStartedShards nodeOperation(NodeRequest request) throws ElasticSearchException { - LocalGatewayStartedShards startedShards = gateway.currentStartedShards(); - if (startedShards != null) { - for (Map.Entry entry : startedShards.shards().entrySet()) { + Map shardsStateInfo = shardsState.currentStartedShards(); + if (shardsStateInfo != null) { + for (Map.Entry entry : shardsStateInfo.entrySet()) { if (entry.getKey().equals(request.shardId)) { - assert entry.getValue() != null; - return new NodeLocalGatewayStartedShards(clusterService.localNode(), entry.getValue()); + return new NodeLocalGatewayStartedShards(clusterService.localNode(), entry.getValue().version); } } } diff --git a/src/test/java/org/elasticsearch/test/integration/gateway/local/SimpleRecoveryLocalGatewayTests.java b/src/test/java/org/elasticsearch/test/integration/gateway/local/SimpleRecoveryLocalGatewayTests.java index fcd3124eeff..557a2d784f2 100644 --- a/src/test/java/org/elasticsearch/test/integration/gateway/local/SimpleRecoveryLocalGatewayTests.java +++ b/src/test/java/org/elasticsearch/test/integration/gateway/local/SimpleRecoveryLocalGatewayTests.java @@ -254,8 +254,8 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests { buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); cleanAndCloseNodes(); - Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).build()); - Node node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).build()); + Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("gateway.recover_after_nodes", 2).build()); + Node node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("gateway.recover_after_nodes", 2).build()); node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); node1.client().admin().indices().prepareFlush().execute().actionGet();