Merge pull request #14831 from ywelsch/feature/persist-allocid-shardstate

Persist allocation ID with shard state metadata on nodes
This commit is contained in:
Yannick Welsch 2015-11-23 17:44:36 +01:00
commit 7d32c88d7b
8 changed files with 123 additions and 34 deletions

View File

@ -19,11 +19,14 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.gateway.CorruptStateException;
import java.io.IOException;
@ -37,8 +40,11 @@ import java.io.IOException;
* behavior to how ShardRouting#currentNodeId is used.
*/
public class AllocationId implements ToXContent {
private static final String ID_KEY = "id";
private static final String RELOCATION_ID_KEY = "relocation_id";
private final String id;
@Nullable
private final String relocationId;
AllocationId(StreamInput in) throws IOException {
@ -148,12 +154,45 @@ public class AllocationId implements ToXContent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("allocation_id");
builder.field("id", id);
builder.startObject();
builder.field(ID_KEY, id);
if (relocationId != null) {
builder.field("relocation_id", relocationId);
builder.field(RELOCATION_ID_KEY, relocationId);
}
builder.endObject();
return builder;
}
public static AllocationId fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token == null) { // fresh parser? move to the first real token under object
token = parser.nextToken();
}
assert token == XContentParser.Token.START_OBJECT;
String id = null;
String relocationId = null;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (ID_KEY.equals(currentFieldName)) {
id = parser.text();
} else if (RELOCATION_ID_KEY.equals(currentFieldName)) {
relocationId = parser.text();
} else {
throw new CorruptStateException("unexpected field in allocation id [" + currentFieldName + "]");
}
} else {
throw new CorruptStateException("unexpected token in allocation id [" + token.name() + "]");
}
}
if (id == null) {
throw new CorruptStateException("missing value for [id] in allocation id");
}
return new AllocationId(id, relocationId);
}
}

View File

@ -716,6 +716,7 @@ public final class ShardRouting implements Streamable, ToXContent {
restoreSource().toXContent(builder, params);
}
if (allocationId != null) {
builder.field("allocation_id");
allocationId.toXContent(builder, params);
}
if (unassignedInfo != null) {

View File

@ -1432,7 +1432,7 @@ public class IndexShard extends AbstractIndexShardComponent {
" previous version: [" + currentRouting.version() + "] current version [" + newRouting.version() + "]";
return;
}
final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.version(), newRouting.primary(), getIndexUUID());
final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.version(), newRouting.primary(), getIndexUUID(), newRouting.allocationId());
logger.trace("{} writing shard state, reason [{}]", shardId, writeReason);
ShardStateMetaData.FORMAT.write(newShardStateMetadata, newShardStateMetadata.version, shardPath().getShardStatePath());
} catch (IOException e) { // this is how we used to handle it.... :(

View File

@ -20,6 +20,9 @@
package org.elasticsearch.index.shard;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
@ -36,17 +39,21 @@ public final class ShardStateMetaData {
private static final String SHARD_STATE_FILE_PREFIX = "state-";
private static final String PRIMARY_KEY = "primary";
private static final String VERSION_KEY = "version";
private static final String INDEX_UUID_KEY = "index_uuid" ;
private static final String INDEX_UUID_KEY = "index_uuid";
private static final String ALLOCATION_ID_KEY = "allocation_id";
public final long version;
public final String indexUUID;
public final boolean primary;
@Nullable
public final AllocationId allocationId; // can be null if we read from legacy format (see fromXContent)
public ShardStateMetaData(long version, boolean primary, String indexUUID) {
public ShardStateMetaData(long version, boolean primary, String indexUUID, AllocationId allocationId) {
assert indexUUID != null;
this.version = version;
this.primary = primary;
this.indexUUID = indexUUID;
this.allocationId = allocationId;
}
@Override
@ -69,6 +76,9 @@ public final class ShardStateMetaData {
if (indexUUID != null ? !indexUUID.equals(that.indexUUID) : that.indexUUID != null) {
return false;
}
if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) {
return false;
}
return true;
}
@ -77,13 +87,14 @@ public final class ShardStateMetaData {
public int hashCode() {
int result = Long.hashCode(version);
result = 31 * result + (indexUUID != null ? indexUUID.hashCode() : 0);
result = 31 * result + (allocationId != null ? allocationId.hashCode() : 0);
result = 31 * result + (primary ? 1 : 0);
return result;
}
@Override
public String toString() {
return "version [" + version + "], primary [" + primary + "]";
return "version [" + version + "], primary [" + primary + "], allocation [" + allocationId + "]";
}
public static final MetaDataStateFormat<ShardStateMetaData> FORMAT = new MetaDataStateFormat<ShardStateMetaData>(XContentType.JSON, SHARD_STATE_FILE_PREFIX) {
@ -100,6 +111,7 @@ public final class ShardStateMetaData {
builder.field(VERSION_KEY, shardStateMetaData.version);
builder.field(PRIMARY_KEY, shardStateMetaData.primary);
builder.field(INDEX_UUID_KEY, shardStateMetaData.indexUUID);
builder.field(ALLOCATION_ID_KEY, shardStateMetaData.allocationId);
}
@Override
@ -112,6 +124,7 @@ public final class ShardStateMetaData {
Boolean primary = null;
String currentFieldName = null;
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
AllocationId allocationId = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
@ -125,6 +138,12 @@ public final class ShardStateMetaData {
} else {
throw new CorruptStateException("unexpected field in shard state [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (ALLOCATION_ID_KEY.equals(currentFieldName)) {
allocationId = AllocationId.fromXContent(parser);
} else {
throw new CorruptStateException("unexpected object in shard state [" + currentFieldName + "]");
}
} else {
throw new CorruptStateException("unexpected token in shard state [" + token.name() + "]");
}
@ -135,7 +154,7 @@ public final class ShardStateMetaData {
if (version == -1) {
throw new CorruptStateException("missing value for [version] in shard state");
}
return new ShardStateMetaData(version, primary, indexUUID);
return new ShardStateMetaData(version, primary, indexUUID, allocationId);
}
};
}

View File

@ -19,9 +19,18 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
/**
*/
@ -113,4 +122,14 @@ public class AllocationIdTests extends ESTestCase {
assertThat(shard.allocationId().getRelocationId(), nullValue());
assertThat(shard.allocationId().getId(), not(equalTo(allocationId.getId())));
}
public void testSerialization() throws IOException {
AllocationId allocationId = AllocationId.newInitializing();
if (randomBoolean()) {
allocationId = AllocationId.newRelocation(allocationId);
}
BytesReference bytes = allocationId.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).bytes();
AllocationId parsedAllocationId = AllocationId.fromXContent(XContentFactory.xContent(XContentType.JSON).createParser(bytes));
assertEquals(allocationId, parsedAllocationId);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.bwcompat.OldIndexBackwardsCompatibilityIT;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.FileSystemUtils;
@ -88,7 +89,7 @@ public class MultiDataPathUpgraderTests extends ESTestCase {
}
}
++metaStateVersion;
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(metaStateVersion, true, uuid), metaStateVersion, shardDataPaths);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(metaStateVersion, true, uuid, AllocationId.newInitializing()), metaStateVersion, shardDataPaths);
}
final Path path = randomFrom(shardDataPaths);
ShardPath targetPath = new ShardPath(false, path, path, uuid, new ShardId("foo", 0));
@ -199,7 +200,7 @@ public class MultiDataPathUpgraderTests extends ESTestCase {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment()) {
String uuid = Strings.randomBase64UUID();
final ShardId shardId = new ShardId("foo", 0);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(1, true, uuid), 1, nodeEnvironment.availableShardPaths(shardId));
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(1, true, uuid, AllocationId.newInitializing()), 1, nodeEnvironment.availableShardPaths(shardId));
MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment);
boolean multiDataPaths = nodeEnvironment.nodeDataPaths().length > 1;
boolean needsUpgrading = helper.needsUpgrading(shardId);
@ -267,7 +268,7 @@ public class MultiDataPathUpgraderTests extends ESTestCase {
}
};
String uuid = Strings.randomBase64UUID();
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(1, true, uuid), 1, paths);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(1, true, uuid, AllocationId.newInitializing()), 1, paths);
final ShardPath shardPath = helper.pickShardPath(new ShardId("foo", 0));
assertEquals(expectedPath, shardPath.getDataPath());
assertEquals(expectedPath, shardPath.getShardStatePath());

View File

@ -133,17 +133,18 @@ public class IndexShardTests extends ESSingleNodeTestCase {
ShardId id = new ShardId("foo", 1);
long version = between(1, Integer.MAX_VALUE / 2);
boolean primary = randomBoolean();
ShardStateMetaData state1 = new ShardStateMetaData(version, primary, "foo");
AllocationId allocationId = randomAllocationId();
ShardStateMetaData state1 = new ShardStateMetaData(version, primary, "foo", allocationId);
write(state1, env.availableShardPaths(id));
ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(id));
assertEquals(shardStateMetaData, state1);
ShardStateMetaData state2 = new ShardStateMetaData(version, primary, "foo");
ShardStateMetaData state2 = new ShardStateMetaData(version, primary, "foo", allocationId);
write(state2, env.availableShardPaths(id));
shardStateMetaData = load(logger, env.availableShardPaths(id));
assertEquals(shardStateMetaData, state1);
ShardStateMetaData state3 = new ShardStateMetaData(version + 1, primary, "foo");
ShardStateMetaData state3 = new ShardStateMetaData(version + 1, primary, "foo", allocationId);
write(state3, env.availableShardPaths(id));
shardStateMetaData = load(logger, env.availableShardPaths(id));
assertEquals(shardStateMetaData, state3);
@ -190,39 +191,38 @@ public class IndexShardTests extends ESSingleNodeTestCase {
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings().getUUID()));
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
routing = new ShardRouting(shard.shardRouting, shard.shardRouting.version() + 1);
shard.updateRoutingEntry(routing, true);
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings().getUUID()));
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
routing = new ShardRouting(shard.shardRouting, shard.shardRouting.version() + 1);
shard.updateRoutingEntry(routing, true);
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings().getUUID()));
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
// test if we still write it even if the shard is not active
ShardRouting inactiveRouting = TestShardRouting.newShardRouting(shard.shardRouting.index(), shard.shardRouting.shardId().id(), shard.shardRouting.currentNodeId(), null, null, true, ShardRoutingState.INITIALIZING, shard.shardRouting.version() + 1);
shard.persistMetadata(inactiveRouting, shard.shardRouting);
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals("inactive shard state shouldn't be persisted", shardStateMetaData, getShardStateMetadata(shard));
assertEquals("inactive shard state shouldn't be persisted", shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings().getUUID()));
assertEquals("inactive shard state shouldn't be persisted", shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
shard.updateRoutingEntry(new ShardRouting(shard.shardRouting, shard.shardRouting.version() + 1), false);
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertFalse("shard state persisted despite of persist=false", shardStateMetaData.equals(getShardStateMetadata(shard)));
assertEquals("shard state persisted despite of persist=false", shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings().getUUID()));
assertEquals("shard state persisted despite of persist=false", shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
routing = new ShardRouting(shard.shardRouting, shard.shardRouting.version() + 1);
shard.updateRoutingEntry(routing, true);
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings().getUUID()));
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
}
public void testDeleteShardState() throws IOException {
@ -275,22 +275,31 @@ public class IndexShardTests extends ESSingleNodeTestCase {
if (shardRouting == null) {
return null;
} else {
return new ShardStateMetaData(shardRouting.version(), shardRouting.primary(), shard.indexSettings().getUUID());
return new ShardStateMetaData(shardRouting.version(), shardRouting.primary(), shard.indexSettings().getUUID(), shardRouting.allocationId());
}
}
private AllocationId randomAllocationId() {
AllocationId allocationId = AllocationId.newInitializing();
if (randomBoolean()) {
allocationId = AllocationId.newRelocation(allocationId);
}
return allocationId;
}
public void testShardStateMetaHashCodeEquals() {
ShardStateMetaData meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10));
ShardStateMetaData meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), randomAllocationId());
assertEquals(meta, new ShardStateMetaData(meta.version, meta.primary, meta.indexUUID));
assertEquals(meta.hashCode(), new ShardStateMetaData(meta.version, meta.primary, meta.indexUUID).hashCode());
assertEquals(meta, new ShardStateMetaData(meta.version, meta.primary, meta.indexUUID, meta.allocationId));
assertEquals(meta.hashCode(), new ShardStateMetaData(meta.version, meta.primary, meta.indexUUID, meta.allocationId).hashCode());
assertFalse(meta.equals(new ShardStateMetaData(meta.version, !meta.primary, meta.indexUUID)));
assertFalse(meta.equals(new ShardStateMetaData(meta.version + 1, meta.primary, meta.indexUUID)));
assertFalse(meta.equals(new ShardStateMetaData(meta.version, !meta.primary, meta.indexUUID + "foo")));
assertFalse(meta.equals(new ShardStateMetaData(meta.version, !meta.primary, meta.indexUUID, meta.allocationId)));
assertFalse(meta.equals(new ShardStateMetaData(meta.version + 1, meta.primary, meta.indexUUID, meta.allocationId)));
assertFalse(meta.equals(new ShardStateMetaData(meta.version, !meta.primary, meta.indexUUID + "foo", meta.allocationId)));
assertFalse(meta.equals(new ShardStateMetaData(meta.version, !meta.primary, meta.indexUUID + "foo", randomAllocationId())));
Set<Integer> hashCodes = new HashSet<>();
for (int i = 0; i < 30; i++) { // just a sanity check that we impl hashcode
meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10));
meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), randomAllocationId());
hashCodes.add(meta.hashCode());
}
assertTrue("more than one unique hashcode expected but got: " + hashCodes.size(), hashCodes.size() > 1);

View File

@ -20,6 +20,7 @@ package org.elasticsearch.index.shard;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.test.ESTestCase;
@ -43,7 +44,7 @@ public class ShardPathTests extends ESTestCase {
ShardId shardId = new ShardId("foo", 0);
Path[] paths = env.availableShardPaths(shardId);
Path path = randomFrom(paths);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(2, true, "0xDEADBEEF"), 2, path);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(2, true, "0xDEADBEEF", AllocationId.newInitializing()), 2, path);
ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.index(), settings));
assertEquals(path, shardPath.getDataPath());
assertEquals("0xDEADBEEF", shardPath.getIndexUUID());
@ -62,7 +63,7 @@ public class ShardPathTests extends ESTestCase {
Path[] paths = env.availableShardPaths(shardId);
assumeTrue("This test tests multi data.path but we only got one", paths.length > 1);
int id = randomIntBetween(1, 10);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(id, true, "0xDEADBEEF"), id, paths);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(id, true, "0xDEADBEEF", AllocationId.newInitializing()), id, paths);
ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.index(), settings));
fail("Expected IllegalStateException");
} catch (IllegalStateException e) {
@ -79,7 +80,7 @@ public class ShardPathTests extends ESTestCase {
Path[] paths = env.availableShardPaths(shardId);
Path path = randomFrom(paths);
int id = randomIntBetween(1, 10);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(id, true, "0xDEADBEEF"), id, path);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(id, true, "0xDEADBEEF", AllocationId.newInitializing()), id, path);
ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.index(), settings));
fail("Expected IllegalStateException");
} catch (IllegalStateException e) {
@ -133,7 +134,7 @@ public class ShardPathTests extends ESTestCase {
ShardId shardId = new ShardId("foo", 0);
Path[] paths = env.availableShardPaths(shardId);
Path path = randomFrom(paths);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(2, true, "0xDEADBEEF"), 2, path);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(2, true, "0xDEADBEEF", AllocationId.newInitializing()), 2, path);
ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.index(), indexSetttings));
boolean found = false;
for (Path p : env.nodeDataPaths()) {