also store the fact that a shard is primary or not in the shard state

This commit is contained in:
Shay Banon 2012-01-23 21:57:39 +02:00
parent 9d99c54e2c
commit 50aad23fac
2 changed files with 27 additions and 12 deletions

View File

@ -118,7 +118,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
// our node is not in play yet...
for (MutableShardRouting shardRouting : routingNode) {
if (shardRouting.active()) {
newState.put(shardRouting.shardId(), new ShardStateInfo(shardRouting.version()));
newState.put(shardRouting.shardId(), new ShardStateInfo(shardRouting.version(), shardRouting.primary()));
}
}
}
@ -168,7 +168,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
Map<ShardId, ShardStateInfo> shardsState = Maps.newHashMap();
for (ShardId shardId : shardIds) {
long highestShardVersion = -1;
File highestShardFile = null;
ShardStateInfo highestShardState = null;
for (File shardLocation : nodeEnv.shardLocations(shardId)) {
File shardStateDir = new File(shardLocation, "_state");
if (!shardStateDir.exists() || !shardStateDir.isDirectory()) {
@ -191,10 +191,14 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
logger.debug("[{}][{}]: not data for [" + stateFile.getAbsolutePath() + "], ignoring...", shardId.index().name(), shardId.id());
continue;
}
long readStateVersion = readShardState(data);
assert readStateVersion == version;
ShardStateInfo readState = readShardState(data);
if (readState == null) {
logger.debug("[{}][{}]: not data for [" + stateFile.getAbsolutePath() + "], ignoring...", shardId.index().name(), shardId.id());
continue;
}
assert readState.version == version;
highestShardState = readState;
highestShardVersion = version;
highestShardFile = stateFile;
}
} catch (Exception e) {
logger.debug("[{}][{}]: failed to read [" + stateFile.getAbsolutePath() + "], ignoring...", e, shardId.index().name(), shardId.id());
@ -202,11 +206,11 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
}
}
// did we find a state file?
if (highestShardFile == null) {
if (highestShardState == null) {
continue;
}
shardsState.put(shardId, new ShardStateInfo(highestShardVersion));
shardsState.put(shardId, highestShardState);
// update the global version
if (highestShardVersion > highestVersion) {
@ -219,15 +223,17 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
}
}
private long readShardState(byte[] data) throws Exception {
@Nullable
private ShardStateInfo readShardState(byte[] data) throws Exception {
XContentParser parser = null;
try {
parser = XContentHelper.createParser(data, 0, data.length);
XContentParser.Token token = parser.nextToken();
if (token == null) {
return -1;
return null;
}
long version = -1;
Boolean primary = null;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
@ -235,10 +241,12 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
} else if (token.isValue()) {
if ("version".equals(currentFieldName)) {
version = parser.longValue();
} else if ("primary".equals(currentFieldName)) {
primary = parser.booleanValue();
}
}
}
return version;
return new ShardStateInfo(version, primary);
} finally {
if (parser != null) {
parser.close();
@ -425,7 +433,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
}
}
}
shardsState.put(new ShardId(shardIndex, shardId), new ShardStateInfo(version));
shardsState.put(new ShardId(shardIndex, shardId), new ShardStateInfo(version, null));
}
}
}

View File

@ -19,13 +19,20 @@
package org.elasticsearch.gateway.local.state.shards;
import org.elasticsearch.common.Nullable;
/**
*/
public class ShardStateInfo {
public final long version;
public ShardStateInfo(long version) {
// can be null if we don't know...
@Nullable
public final Boolean primary;
public ShardStateInfo(long version, Boolean primary) {
this.version = version;
this.primary = primary;
}
}