Local Gateway: Move shard state to be stored under each shard, and not globally under _state, closes #1618.

This commit is contained in:
Shay Banon 2012-01-18 01:08:35 +02:00
parent 801c709b42
commit 534f487de3
9 changed files with 568 additions and 435 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.env;
import com.google.common.collect.Sets;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.NativeFSLockFactory;
import org.elasticsearch.ElasticSearchIllegalStateException;
@ -34,6 +35,7 @@ import org.elasticsearch.index.shard.ShardId;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
/**
*
@ -175,6 +177,36 @@ public class NodeEnvironment extends AbstractComponent {
return shardLocations;
}
public Set<ShardId> findAllShardIds() throws Exception {
if (nodeFiles == null || locks == null) {
throw new ElasticSearchIllegalStateException("node is not configured to store local location");
}
Set<ShardId> shardIds = Sets.newHashSet();
for (File indicesLocation : nodeIndicesLocations) {
File[] indicesList = indicesLocation.listFiles();
if (indicesList == null) {
continue;
}
for (File indexLocation : indicesList) {
if (!indexLocation.isDirectory()) {
continue;
}
String indexName = indexLocation.getName();
File[] shardsList = indexLocation.listFiles();
if (shardsList == null) {
continue;
}
for (File shardLocation : shardsList) {
if (!shardLocation.isDirectory()) {
continue;
}
shardIds.add(new ShardId(indexName, Integer.parseInt(shardLocation.getName())));
}
}
}
return shardIds;
}
public void close() {
if (locks != null) {
for (Lock lock : locks) {

View File

@ -28,7 +28,6 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.inject.Inject;
@ -42,8 +41,8 @@ import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.gateway.local.state.shards.LocalGatewayShardsState;
import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule;
import org.elasticsearch.index.shard.ShardId;
import java.io.*;
import java.util.Set;
@ -58,24 +57,19 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
*/
public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements Gateway, ClusterStateListener {
private boolean requiresStatePersistence;
private final ClusterService clusterService;
private final NodeEnvironment nodeEnv;
private final LocalGatewayShardsState shardsState;
private final TransportNodesListGatewayMetaState listGatewayMetaState;
private final TransportNodesListGatewayStartedShards listGatewayStartedShards;
private final boolean compress;
private final boolean prettyPrint;
private volatile LocalGatewayMetaState currentMetaState;
private volatile LocalGatewayStartedShards currentStartedShards;
private volatile ExecutorService executor;
private volatile boolean initialized = false;
@ -83,13 +77,14 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
private volatile boolean metaDataPersistedAtLeastOnce = false;
@Inject
public LocalGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv,
TransportNodesListGatewayMetaState listGatewayMetaState, TransportNodesListGatewayStartedShards listGatewayStartedShards) {
public LocalGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, LocalGatewayShardsState shardsState,
TransportNodesListGatewayMetaState listGatewayMetaState) {
super(settings);
this.clusterService = clusterService;
this.nodeEnv = nodeEnv;
this.listGatewayMetaState = listGatewayMetaState.initGateway(this);
this.listGatewayStartedShards = listGatewayStartedShards.initGateway(this);
this.shardsState = shardsState;
this.compress = componentSettings.getAsBoolean("compress", true);
this.prettyPrint = componentSettings.getAsBoolean("pretty", false);
@ -105,16 +100,11 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
return this.currentMetaState;
}
public LocalGatewayStartedShards currentStartedShards() {
lazyInitialize();
return this.currentStartedShards;
}
@Override
protected void doStart() throws ElasticSearchException {
this.executor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway"));
lazyInitialize();
clusterService.add(this);
clusterService.addLast(this);
}
@Override
@ -178,10 +168,6 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
@Override
public void clusterChanged(final ClusterChangedEvent event) {
if (!requiresStatePersistence) {
return;
}
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (event.state().blocks().disableStatePersistence()) {
return;
@ -192,50 +178,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
executor.execute(new LoggingRunnable(logger, new PersistMetaData(event)));
}
if (event.state().nodes().localNode().dataNode() && event.routingTableChanged()) {
LocalGatewayStartedShards.Builder builder = LocalGatewayStartedShards.builder();
if (currentStartedShards != null) {
builder.state(currentStartedShards);
}
builder.version(event.state().version());
boolean changed = false;
// remove from the current state all the shards that are primary and started somewhere, we won't need them anymore
// and if they are still here, we will add them in the next phase
// Also note, this works well when closing an index, since a closed index will have no routing shards entries
// so they won't get removed (we want to keep the fact that those shards are allocated on this node if needed)
for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (indexShardRoutingTable.countWithState(ShardRoutingState.STARTED) == indexShardRoutingTable.size()) {
changed |= builder.remove(indexShardRoutingTable.shardId());
}
}
}
// remove deleted indices from the started shards
for (ShardId shardId : builder.build().shards().keySet()) {
if (!event.state().metaData().hasIndex(shardId.index().name())) {
changed |= builder.remove(shardId);
}
}
// now, add all the ones that are active and on this node
RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId());
if (routingNode != null) {
// out node is not in play yet...
for (MutableShardRouting shardRouting : routingNode) {
if (shardRouting.active()) {
changed |= builder.put(shardRouting.shardId(), shardRouting.version());
}
}
}
// only write if something changed...
if (changed) {
final LocalGatewayStartedShards stateToWrite = builder.build();
executor.execute(new LoggingRunnable(logger, new PersistShards(event, stateToWrite)));
}
}
shardsState.clusterChanged(event);
}
/**
@ -251,84 +194,21 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
}
initialized = true;
// if this is not a possible master node or data node, bail, we won't save anything here...
if (!clusterService.localNode().masterNode() && !clusterService.localNode().dataNode()) {
requiresStatePersistence = false;
} else {
// create the location where the state will be stored
// TODO: we might want to persist states on all data locations
requiresStatePersistence = true;
if (clusterService.localNode().masterNode()) {
try {
File latest = findLatestMetaStateVersion();
if (latest != null) {
logger.debug("[find_latest_state]: loading metadata from [{}]", latest.getAbsolutePath());
this.currentMetaState = readMetaState(Streams.copyToByteArray(new FileInputStream(latest)));
} else {
logger.debug("[find_latest_state]: no metadata state loaded");
}
} catch (Exception e) {
logger.warn("failed to read local state (metadata)", e);
}
}
if (clusterService.localNode().dataNode()) {
try {
File latest = findLatestStartedShardsVersion();
if (latest != null) {
logger.debug("[find_latest_state]: loading started shards from [{}]", latest.getAbsolutePath());
this.currentStartedShards = readStartedShards(Streams.copyToByteArray(new FileInputStream(latest)));
} else {
logger.debug("[find_latest_state]: no started shards loaded");
}
} catch (Exception e) {
logger.warn("failed to read local state (started shards)", e);
if (clusterService.localNode().masterNode()) {
try {
File latest = findLatestMetaStateVersion();
if (latest != null) {
logger.debug("[find_latest_state]: loading metadata from [{}]", latest.getAbsolutePath());
this.currentMetaState = readMetaState(Streams.copyToByteArray(new FileInputStream(latest)));
} else {
logger.debug("[find_latest_state]: no metadata state loaded");
}
} catch (Exception e) {
logger.warn("failed to read local state (metadata)", e);
}
}
}
private File findLatestStartedShardsVersion() throws IOException {
long index = -1;
File latest = null;
for (File dataLocation : nodeEnv.nodeDataLocations()) {
File stateLocation = new File(dataLocation, "_state");
if (!stateLocation.exists()) {
continue;
}
File[] stateFiles = stateLocation.listFiles();
if (stateFiles == null) {
continue;
}
for (File stateFile : stateFiles) {
if (logger.isTraceEnabled()) {
logger.trace("[find_latest_state]: processing [" + stateFile.getName() + "]");
}
String name = stateFile.getName();
if (!name.startsWith("shards-")) {
continue;
}
long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1));
if (fileIndex >= index) {
// try and read the meta data
try {
byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile));
if (data.length == 0) {
logger.debug("[find_latest_state]: not data for [" + name + "], ignoring...");
}
readStartedShards(data);
index = fileIndex;
latest = stateFile;
} catch (IOException e) {
logger.warn("[find_latest_state]: failed to read state from [" + name + "], ignoring...", e);
}
}
}
}
return latest;
}
private File findLatestMetaStateVersion() throws IOException {
long index = -1;
File latest = null;
@ -388,24 +268,6 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
}
}
private LocalGatewayStartedShards readStartedShards(byte[] data) throws IOException {
XContentParser parser = null;
try {
if (LZF.isCompressed(data)) {
BytesStreamInput siBytes = new BytesStreamInput(data, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);
} else {
parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
}
return LocalGatewayStartedShards.Builder.fromXContent(parser);
} finally {
if (parser != null) {
parser.close();
}
}
}
class PersistMetaData implements Runnable {
private final ClusterChangedEvent event;
@ -493,86 +355,4 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
}
}
}
class PersistShards implements Runnable {
private final ClusterChangedEvent event;
private final LocalGatewayStartedShards stateToWrite;
public PersistShards(ClusterChangedEvent event, LocalGatewayStartedShards stateToWrite) {
this.event = event;
this.stateToWrite = stateToWrite;
}
@Override
public void run() {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
StreamOutput streamOutput;
try {
if (compress) {
streamOutput = cachedEntry.cachedLZFBytes();
} else {
streamOutput = cachedEntry.cachedBytes();
}
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, streamOutput);
if (prettyPrint) {
xContentBuilder.prettyPrint();
}
xContentBuilder.startObject();
LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
xContentBuilder.close();
} catch (Exception e) {
logger.warn("failed to serialize local gateway shard states", e);
return;
}
boolean serializedAtLeastOnce = false;
for (File dataLocation : nodeEnv.nodeDataLocations()) {
File stateLocation = new File(dataLocation, "_state");
if (!stateLocation.exists()) {
FileSystemUtils.mkdirs(stateLocation);
}
File stateFile = new File(stateLocation, "shards-" + event.state().version());
FileOutputStream fos = null;
try {
fos = new FileOutputStream(stateFile);
fos.write(cachedEntry.bytes().underlyingBytes(), 0, cachedEntry.bytes().size());
fos.getChannel().force(true);
serializedAtLeastOnce = true;
} catch (Exception e) {
logger.warn("failed to write local gateway shards state to {}", e, stateFile);
} finally {
Closeables.closeQuietly(fos);
}
}
if (serializedAtLeastOnce) {
currentStartedShards = stateToWrite;
// delete all the other files
for (File dataLocation : nodeEnv.nodeDataLocations()) {
File stateLocation = new File(dataLocation, "_state");
if (!stateLocation.exists()) {
continue;
}
File[] files = stateLocation.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith("shards-") && !name.equals("shards-" + event.state().version());
}
});
if (files != null) {
for (File file : files) {
file.delete();
}
}
}
}
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
}
}
}
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.gateway.local.state.shards.TransportNodesListGatewayStartedShards;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;

View File

@ -24,6 +24,8 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.PreProcessModule;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.local.state.shards.LocalGatewayShardsState;
import org.elasticsearch.gateway.local.state.shards.TransportNodesListGatewayStartedShards;
/**
*
@ -33,6 +35,7 @@ public class LocalGatewayModule extends AbstractModule implements PreProcessModu
@Override
protected void configure() {
bind(Gateway.class).to(LocalGateway.class).asEagerSingleton();
bind(LocalGatewayShardsState.class).asEagerSingleton();
bind(TransportNodesListGatewayMetaState.class).asEagerSingleton();
bind(TransportNodesListGatewayStartedShards.class).asEagerSingleton();
}

View File

@ -1,185 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.local;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Map;
/**
*
*/
public class LocalGatewayStartedShards {
private final long version;
private final ImmutableMap<ShardId, Long> shards;
public LocalGatewayStartedShards(long version, Map<ShardId, Long> shards) {
this.version = version;
this.shards = ImmutableMap.copyOf(shards);
}
public long version() {
return version;
}
public ImmutableMap<ShardId, Long> shards() {
return shards;
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private long version;
private Map<ShardId, Long> shards = Maps.newHashMap();
public Builder state(LocalGatewayStartedShards state) {
this.version = state.version();
this.shards.putAll(state.shards);
return this;
}
public Builder version(long version) {
this.version = version;
return this;
}
/**
* Returns <tt>true</tt> if something really changed.
*/
public boolean remove(ShardId shardId) {
return shards.remove(shardId) != null;
}
/**
* Returns <tt>true</tt> if something really changed.
*/
public boolean put(ShardId shardId, long version) {
Long lVersion = shards.get(shardId);
if (lVersion != null && lVersion == version) {
return false;
}
this.shards.put(shardId, version);
return true;
}
public LocalGatewayStartedShards build() {
return new LocalGatewayStartedShards(version, shards);
}
public static void toXContent(LocalGatewayStartedShards state, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject("state");
builder.field("version", state.version());
builder.startArray("shards");
for (Map.Entry<ShardId, Long> entry : state.shards.entrySet()) {
builder.startObject();
builder.field("index", entry.getKey().index().name());
builder.field("id", entry.getKey().id());
builder.field("version", entry.getValue());
builder.endObject();
}
builder.endArray();
builder.endObject();
}
public static LocalGatewayStartedShards fromXContent(XContentParser parser) throws IOException {
Builder builder = new Builder();
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
if (token == null) {
// no data...
return builder.build();
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_ARRAY) {
if ("shards".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.START_OBJECT) {
String shardIndex = null;
int shardId = -1;
long version = -1;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("index".equals(currentFieldName)) {
shardIndex = parser.text();
} else if ("id".equals(currentFieldName)) {
shardId = parser.intValue();
} else if ("version".equals(currentFieldName)) {
version = parser.longValue();
}
}
}
builder.shards.put(new ShardId(shardIndex, shardId), version);
}
}
}
} else if (token.isValue()) {
if ("version".equals(currentFieldName)) {
builder.version = parser.longValue();
}
}
}
return builder.build();
}
public static LocalGatewayStartedShards readFrom(StreamInput in) throws IOException {
LocalGatewayStartedShards.Builder builder = new Builder();
builder.version = in.readLong();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
builder.shards.put(ShardId.readShardId(in), in.readLong());
}
return builder.build();
}
public static void writeTo(LocalGatewayStartedShards state, StreamOutput out) throws IOException {
out.writeLong(state.version());
out.writeVInt(state.shards.size());
for (Map.Entry<ShardId, Long> entry : state.shards.entrySet()) {
entry.getKey().writeTo(out);
out.writeLong(entry.getValue());
}
}
}
}

View File

@ -0,0 +1,472 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.local.state.shards;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.LZFStreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
/**
*/
public class LocalGatewayShardsState extends AbstractComponent implements ClusterStateListener {
private final NodeEnvironment nodeEnv;
private final ClusterService clusterService;
private volatile boolean initialized = false;
private volatile Map<ShardId, ShardStateInfo> currentState = Maps.newHashMap();
@Inject
public LocalGatewayShardsState(Settings settings, NodeEnvironment nodeEnv, ClusterService clusterService, TransportNodesListGatewayStartedShards listGatewayStartedShards) {
super(settings);
this.nodeEnv = nodeEnv;
this.clusterService = clusterService;
listGatewayStartedShards.initGateway(this);
}
public Map<ShardId, ShardStateInfo> currentStartedShards() {
lazyInitialize();
return this.currentState;
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().disableStatePersistence()) {
return;
}
if (!event.state().nodes().localNode().dataNode()) {
return;
}
if (!event.routingTableChanged()) {
return;
}
Map<ShardId, ShardStateInfo> newState = Maps.newHashMap();
newState.putAll(this.currentState);
// remove from the current state all the shards that are completely started somewhere, we won't need them anymore
// and if they are still here, we will add them in the next phase
// Also note, this works well when closing an index, since a closed index will have no routing shards entries
// so they won't get removed (we want to keep the fact that those shards are allocated on this node if needed)
for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (indexShardRoutingTable.countWithState(ShardRoutingState.STARTED) == indexShardRoutingTable.size()) {
newState.remove(indexShardRoutingTable.shardId());
}
}
}
// remove deleted indices from the started shards
for (ShardId shardId : currentState.keySet()) {
if (!event.state().metaData().hasIndex(shardId.index().name())) {
newState.remove(shardId);
}
}
// now, add all the ones that are active and on this node
RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId());
if (routingNode != null) {
// our node is not in play yet...
for (MutableShardRouting shardRouting : routingNode) {
if (shardRouting.active()) {
newState.put(shardRouting.shardId(), new ShardStateInfo(shardRouting.version()));
}
}
}
// go over the write started shards if needed
for (Iterator<Map.Entry<ShardId, ShardStateInfo>> it = newState.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<ShardId, ShardStateInfo> entry = it.next();
ShardId shardId = entry.getKey();
ShardStateInfo shardStateInfo = entry.getValue();
String writeReason = null;
ShardStateInfo currentShardStateInfo = currentState.get(shardId);
if (currentShardStateInfo == null) {
writeReason = "freshly started, version [" + shardStateInfo.version + "]";
} else if (currentShardStateInfo.version != shardStateInfo.version) {
writeReason = "version changed from [" + currentShardStateInfo.version + "] to [" + shardStateInfo.version + "]";
}
// we update the write reason if we really need to write a new one...
if (writeReason == null) {
continue;
}
try {
writeShardState(writeReason, shardId, shardStateInfo, currentShardStateInfo);
} catch (Exception e) {
// we failed to write the shard state, remove it from our builder, we will try and write
// it next time...
it.remove();
}
}
// now, go over the current ones and delete ones that are not in the new one
for (Map.Entry<ShardId, ShardStateInfo> entry : currentState.entrySet()) {
ShardId shardId = entry.getKey();
if (!newState.containsKey(shardId)) {
deleteShardState(shardId);
}
}
this.currentState = newState;
}
private synchronized void lazyInitialize() {
if (initialized) {
return;
}
initialized = true;
// we only persist shards state for data nodes
if (!clusterService.localNode().dataNode()) {
return;
}
try {
pre019Upgrade();
long start = System.currentTimeMillis();
loadStartedShards();
logger.debug("took {} to load started shards state", TimeValue.timeValueMillis(System.currentTimeMillis() - start));
} catch (Exception e) {
logger.error("failed to read local state (started shards), exiting...", e);
// ugly, but, if we fail to read it, bail completely so we don't have any node corrupting the cluster
System.exit(1);
}
}
private void loadStartedShards() throws Exception {
Set<ShardId> shardIds = nodeEnv.findAllShardIds();
long highestVersion = -1;
Map<ShardId, ShardStateInfo> shardsState = Maps.newHashMap();
for (ShardId shardId : shardIds) {
long highestShardVersion = -1;
File highestShardFile = null;
for (File shardLocation : nodeEnv.shardLocations(shardId)) {
File shardStateDir = new File(shardLocation, "_state");
if (!shardStateDir.exists() || !shardStateDir.isDirectory()) {
continue;
}
// now, iterate over the current versions, and find latest one
File[] stateFiles = shardStateDir.listFiles();
if (stateFiles == null) {
continue;
}
for (File stateFile : stateFiles) {
if (!stateFile.getName().startsWith("state-")) {
continue;
}
try {
long version = Long.parseLong(stateFile.getName().substring("state-".length()));
if (version > highestShardVersion) {
byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile));
if (data.length == 0) {
logger.debug("[{}][{}]: not data for [" + stateFile.getAbsolutePath() + "], ignoring...", shardId.index().name(), shardId.id());
continue;
}
long readStateVersion = readShardState(data);
assert readStateVersion == version;
highestShardVersion = version;
highestShardFile = stateFile;
}
} catch (Exception e) {
logger.debug("[{}][{}]: failed to read [" + stateFile.getAbsolutePath() + "], ignoring...", e, shardId.index().name(), shardId.id());
}
}
}
// did we find a state file?
if (highestShardFile == null) {
continue;
}
shardsState.put(shardId, new ShardStateInfo(highestShardVersion));
// update the global version
if (highestShardVersion > highestVersion) {
highestVersion = highestShardVersion;
}
}
// update the current started shards only if there is data there...
if (highestVersion != -1) {
currentState = shardsState;
}
}
private long readShardState(byte[] data) throws Exception {
XContentParser parser = null;
try {
if (LZF.isCompressed(data)) {
BytesStreamInput siBytes = new BytesStreamInput(data, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);
} else {
parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
}
XContentParser.Token token = parser.nextToken();
if (token == null) {
return -1;
}
long version = -1;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("version".equals(currentFieldName)) {
version = parser.longValue();
}
}
}
return version;
} finally {
if (parser != null) {
parser.close();
}
}
}
private void writeShardState(String reason, ShardId shardId, ShardStateInfo shardStateInfo, @Nullable ShardStateInfo previousStateInfo) throws Exception {
logger.trace("[{}][{}] writing shard state, reason [{}]", shardId.index().name(), shardId.id(), reason);
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, cachedEntry.cachedBytes());
builder.prettyPrint();
builder.startObject();
builder.field("version", shardStateInfo.version);
builder.endObject();
builder.flush();
Exception lastFailure = null;
boolean wroteAtLeastOnce = false;
for (File shardLocation : nodeEnv.shardLocations(shardId)) {
File shardStateDir = new File(shardLocation, "_state");
FileSystemUtils.mkdirs(shardStateDir);
File stateFile = new File(shardStateDir, "state-" + shardStateInfo.version);
FileOutputStream fos = null;
try {
fos = new FileOutputStream(stateFile);
fos.write(cachedEntry.bytes().underlyingBytes(), 0, cachedEntry.bytes().size());
fos.getChannel().force(true);
Closeables.closeQuietly(fos);
wroteAtLeastOnce = true;
} catch (Exception e) {
lastFailure = e;
} finally {
Closeables.closeQuietly(fos);
}
}
if (!wroteAtLeastOnce) {
logger.warn("[{}][{}]: failed to write shard state", shardId.index().name(), shardId.id(), lastFailure);
throw new IOException("failed to write shard state for " + shardId, lastFailure);
}
// delete the old files
if (previousStateInfo != null) {
for (File shardLocation : nodeEnv.shardLocations(shardId)) {
File stateFile = new File(new File(shardLocation, "_state"), "state-" + previousStateInfo.version);
stateFile.delete();
}
}
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
}
}
private void deleteShardState(ShardId shardId) {
logger.trace("[{}][{}] delete shard state", shardId.index().name(), shardId.id());
File[] shardLocations = nodeEnv.shardLocations(shardId);
for (File shardLocation : shardLocations) {
if (!shardLocation.exists()) {
continue;
}
FileSystemUtils.deleteRecursively(new File(shardLocation, "_state"));
}
}
private void pre019Upgrade() throws Exception {
long index = -1;
File latest = null;
for (File dataLocation : nodeEnv.nodeDataLocations()) {
File stateLocation = new File(dataLocation, "_state");
if (!stateLocation.exists()) {
continue;
}
File[] stateFiles = stateLocation.listFiles();
if (stateFiles == null) {
continue;
}
for (File stateFile : stateFiles) {
if (logger.isTraceEnabled()) {
logger.trace("[find_latest_state]: processing [" + stateFile.getName() + "]");
}
String name = stateFile.getName();
if (!name.startsWith("shards-")) {
continue;
}
long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1));
if (fileIndex >= index) {
// try and read the meta data
try {
byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile));
if (data.length == 0) {
logger.debug("[find_latest_state]: not data for [" + name + "], ignoring...");
}
pre09ReadState(data);
index = fileIndex;
latest = stateFile;
} catch (IOException e) {
logger.warn("[find_latest_state]: failed to read state from [" + name + "], ignoring...", e);
}
}
}
}
if (latest == null) {
return;
}
logger.info("found old shards state, loading started shards from [{}] and converting to new shards state locations...", latest.getAbsolutePath());
Map<ShardId, ShardStateInfo> shardsState = pre09ReadState(Streams.copyToByteArray(new FileInputStream(latest)));
for (Map.Entry<ShardId, ShardStateInfo> entry : shardsState.entrySet()) {
writeShardState("upgrade", entry.getKey(), entry.getValue(), null);
}
// rename shards state to backup state
File backupFile = new File(latest.getParentFile(), "backup-" + latest.getName());
if (!latest.renameTo(backupFile)) {
throw new IOException("failed to rename old state to backup state [" + latest.getAbsolutePath() + "]");
}
// delete all other shards state files
for (File dataLocation : nodeEnv.nodeDataLocations()) {
File stateLocation = new File(dataLocation, "_state");
if (!stateLocation.exists()) {
continue;
}
File[] stateFiles = stateLocation.listFiles();
if (stateFiles == null) {
continue;
}
for (File stateFile : stateFiles) {
if (logger.isTraceEnabled()) {
logger.trace("[find_latest_state]: processing [" + stateFile.getName() + "]");
}
String name = stateFile.getName();
if (!name.startsWith("shards-")) {
continue;
}
stateFile.delete();
}
}
logger.info("conversion to new shards state location and format done, backup create at [{}]", backupFile.getAbsolutePath());
}
private Map<ShardId, ShardStateInfo> pre09ReadState(byte[] data) throws IOException {
XContentParser parser = null;
try {
Map<ShardId, ShardStateInfo> shardsState = Maps.newHashMap();
if (LZF.isCompressed(data)) {
BytesStreamInput siBytes = new BytesStreamInput(data, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);
} else {
parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
}
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
if (token == null) {
// no data...
return shardsState;
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_ARRAY) {
if ("shards".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.START_OBJECT) {
String shardIndex = null;
int shardId = -1;
long version = -1;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("index".equals(currentFieldName)) {
shardIndex = parser.text();
} else if ("id".equals(currentFieldName)) {
shardId = parser.intValue();
} else if ("version".equals(currentFieldName)) {
version = parser.longValue();
}
}
}
shardsState.put(new ShardId(shardIndex, shardId), new ShardStateInfo(version));
}
}
}
}
}
return shardsState;
} finally {
if (parser != null) {
parser.close();
}
}
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.local.state.shards;
/**
*/
public class ShardStateInfo {
public final long version;
public ShardStateInfo(long version) {
this.version = version;
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.gateway.local;
package org.elasticsearch.gateway.local.state.shards;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticSearchException;
@ -48,15 +48,15 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
*/
public class TransportNodesListGatewayStartedShards extends TransportNodesOperationAction<TransportNodesListGatewayStartedShards.Request, TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards, TransportNodesListGatewayStartedShards.NodeRequest, TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards> {
private LocalGateway gateway;
private LocalGatewayShardsState shardsState;
@Inject
public TransportNodesListGatewayStartedShards(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) {
super(settings, clusterName, threadPool, clusterService, transportService);
}
TransportNodesListGatewayStartedShards initGateway(LocalGateway gateway) {
this.gateway = gateway;
TransportNodesListGatewayStartedShards initGateway(LocalGatewayShardsState shardsState) {
this.shardsState = shardsState;
return this;
}
@ -117,12 +117,11 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
@Override
protected NodeLocalGatewayStartedShards nodeOperation(NodeRequest request) throws ElasticSearchException {
LocalGatewayStartedShards startedShards = gateway.currentStartedShards();
if (startedShards != null) {
for (Map.Entry<ShardId, Long> entry : startedShards.shards().entrySet()) {
Map<ShardId, ShardStateInfo> shardsStateInfo = shardsState.currentStartedShards();
if (shardsStateInfo != null) {
for (Map.Entry<ShardId, ShardStateInfo> entry : shardsStateInfo.entrySet()) {
if (entry.getKey().equals(request.shardId)) {
assert entry.getValue() != null;
return new NodeLocalGatewayStartedShards(clusterService.localNode(), entry.getValue());
return new NodeLocalGatewayStartedShards(clusterService.localNode(), entry.getValue().version);
}
}
}

View File

@ -254,8 +254,8 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests {
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).build());
Node node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).build());
Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("gateway.recover_after_nodes", 2).build());
Node node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("gateway.recover_after_nodes", 2).build());
node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
node1.client().admin().indices().prepareFlush().execute().actionGet();