From 50aad23facb03a7220f8c7e94e04baa943c1e281 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 23 Jan 2012 21:57:39 +0200 Subject: [PATCH] also store the fact that a shard is primary or not in the shard state --- .../state/shards/LocalGatewayShardsState.java | 30 ++++++++++++------- .../local/state/shards/ShardStateInfo.java | 9 +++++- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java b/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java index 194548faac5..c7ad004b93b 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java @@ -118,7 +118,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste // our node is not in play yet... for (MutableShardRouting shardRouting : routingNode) { if (shardRouting.active()) { - newState.put(shardRouting.shardId(), new ShardStateInfo(shardRouting.version())); + newState.put(shardRouting.shardId(), new ShardStateInfo(shardRouting.version(), shardRouting.primary())); } } } @@ -168,7 +168,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste Map shardsState = Maps.newHashMap(); for (ShardId shardId : shardIds) { long highestShardVersion = -1; - File highestShardFile = null; + ShardStateInfo highestShardState = null; for (File shardLocation : nodeEnv.shardLocations(shardId)) { File shardStateDir = new File(shardLocation, "_state"); if (!shardStateDir.exists() || !shardStateDir.isDirectory()) { @@ -191,10 +191,14 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste logger.debug("[{}][{}]: not data for [" + stateFile.getAbsolutePath() + "], ignoring...", shardId.index().name(), shardId.id()); continue; } - long readStateVersion = readShardState(data); - assert readStateVersion == version; + ShardStateInfo readState = readShardState(data); + if (readState == null) { + logger.debug("[{}][{}]: not data for [" + stateFile.getAbsolutePath() + "], ignoring...", shardId.index().name(), shardId.id()); + continue; + } + assert readState.version == version; + highestShardState = readState; highestShardVersion = version; - highestShardFile = stateFile; } } catch (Exception e) { logger.debug("[{}][{}]: failed to read [" + stateFile.getAbsolutePath() + "], ignoring...", e, shardId.index().name(), shardId.id()); @@ -202,11 +206,11 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste } } // did we find a state file? - if (highestShardFile == null) { + if (highestShardState == null) { continue; } - shardsState.put(shardId, new ShardStateInfo(highestShardVersion)); + shardsState.put(shardId, highestShardState); // update the global version if (highestShardVersion > highestVersion) { @@ -219,15 +223,17 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste } } - private long readShardState(byte[] data) throws Exception { + @Nullable + private ShardStateInfo readShardState(byte[] data) throws Exception { XContentParser parser = null; try { parser = XContentHelper.createParser(data, 0, data.length); XContentParser.Token token = parser.nextToken(); if (token == null) { - return -1; + return null; } long version = -1; + Boolean primary = null; String currentFieldName = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { @@ -235,10 +241,12 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste } else if (token.isValue()) { if ("version".equals(currentFieldName)) { version = parser.longValue(); + } else if ("primary".equals(currentFieldName)) { + primary = parser.booleanValue(); } } } - return version; + return new ShardStateInfo(version, primary); } finally { if (parser != null) { parser.close(); @@ -425,7 +433,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste } } } - shardsState.put(new ShardId(shardIndex, shardId), new ShardStateInfo(version)); + shardsState.put(new ShardId(shardIndex, shardId), new ShardStateInfo(version, null)); } } } diff --git a/src/main/java/org/elasticsearch/gateway/local/state/shards/ShardStateInfo.java b/src/main/java/org/elasticsearch/gateway/local/state/shards/ShardStateInfo.java index 764ba936ad2..47426aa1e93 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/shards/ShardStateInfo.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/shards/ShardStateInfo.java @@ -19,13 +19,20 @@ package org.elasticsearch.gateway.local.state.shards; +import org.elasticsearch.common.Nullable; + /** */ public class ShardStateInfo { public final long version; - public ShardStateInfo(long version) { + // can be null if we don't know... + @Nullable + public final Boolean primary; + + public ShardStateInfo(long version, Boolean primary) { this.version = version; + this.primary = primary; } }