From f78db1f1d32369da4f8b35093894949e4936605a Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 14 Jul 2015 18:51:05 +0200 Subject: [PATCH] Unique allocation id Add a unique allocation id for a shard, helping to uniquely identify a specific allocation taking place to a node. A special case is relocation, where a transient relocationId is kept around to make sure the target initializing shard (when using RoutingNodes) is using it for its id, and when relocation is done, the transient relocationId becomes the actual id of it. closes #12242 --- .../cluster/routing/AllocationId.java | 129 ++++++++++++++++++ .../cluster/routing/ShardRouting.java | 41 +++++- .../cluster/routing/AllocationIdTests.java | 124 +++++++++++++++++ .../cluster/routing/TestShardRouting.java | 23 +++- 4 files changed, 307 insertions(+), 10 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java create mode 100644 core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java b/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java new file mode 100644 index 00000000000..55e7ca729b7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java @@ -0,0 +1,129 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Uniquely identifies an allocation. An allocation is a shard moving from unassigned to initializing, + * or relocation. + *

+ * Relocation is a special case, where the origin shard is relocating with a relocationId and same id, and + * the target shard (only materialized in RoutingNodes) is initializing with the id set to the origin shard + * relocationId. Once relocation is done, the new allocation id is set to the relocationId. This is similar + * behavior to how ShardRouting#currentNodeId is used. + */ +public class AllocationId { + + private final String id; + private final String relocationId; + + AllocationId(StreamInput in) throws IOException { + this.id = in.readString(); + this.relocationId = in.readOptionalString(); + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.id); + out.writeOptionalString(this.relocationId); + } + + private AllocationId(String id, String relocationId) { + this.id = id; + this.relocationId = relocationId; + } + + /** + * Creates a new allocation id for initializing allocation. + */ + public static AllocationId newInitializing() { + return new AllocationId(Strings.randomBase64UUID(), null); + } + + /** + * Creates a new allocation id for the target initializing shard that is the result + * of a relocation. + */ + public static AllocationId newTargetRelocation(AllocationId allocationId) { + assert allocationId.getRelocationId() != null; + return new AllocationId(allocationId.getRelocationId(), null); + } + + /** + * Creates a new allocation id for a shard that moves to be relocated, populating + * the transient holder for relocationId. + */ + public static AllocationId newRelocation(AllocationId allocationId) { + assert allocationId.getRelocationId() == null; + return new AllocationId(allocationId.getId(), Strings.randomBase64UUID()); + } + + /** + * Creates a new allocation id representing a cancelled relocation. + */ + public static AllocationId cancelRelocation(AllocationId allocationId) { + assert allocationId.getRelocationId() != null; + return new AllocationId(allocationId.getId(), null); + } + + /** + * Creates a new allocation id finalizing a relocation, moving the transient + * relocation id to be the actual id. + */ + public static AllocationId finishRelocation(AllocationId allocationId) { + assert allocationId.getRelocationId() != null; + return new AllocationId(allocationId.getRelocationId(), null); + } + + /** + * The allocation id uniquely identifying an allocation, note, if it is relocation + * the {@link #getRelocationId()} need to be taken into account as well. + */ + public String getId() { + return id; + } + + /** + * The transient relocation id holding the unique id that is used for relocation. + */ + public String getRelocationId() { + return relocationId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + AllocationId that = (AllocationId) o; + if (!id.equals(that.id)) return false; + return !(relocationId != null ? !relocationId.equals(that.relocationId) : that.relocationId != null); + + } + + @Override + public int hashCode() { + int result = id.hashCode(); + result = 31 * result + (relocationId != null ? relocationId.hashCode() : 0); + return result; + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 37b1795ebfa..7165fda8a4d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -44,10 +44,11 @@ public final class ShardRouting implements Streamable, ToXContent { private boolean primary; private ShardRoutingState state; private long version; - private transient ShardId shardIdentifier; private RestoreSource restoreSource; private UnassignedInfo unassignedInfo; + private AllocationId allocationId; private final transient List asList; + private transient ShardId shardIdentifier; private boolean frozen = false; private ShardRouting() { @@ -59,7 +60,7 @@ public final class ShardRouting implements Streamable, ToXContent { } public ShardRouting(ShardRouting copy, long version) { - this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version, copy.unassignedInfo(), true); + this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version, copy.unassignedInfo(), copy.allocationId(), true); } /** @@ -68,7 +69,7 @@ public final class ShardRouting implements Streamable, ToXContent { */ ShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version, - UnassignedInfo unassignedInfo, boolean internal) { + UnassignedInfo unassignedInfo, AllocationId allocationId, boolean internal) { this.index = index; this.shardId = shardId; this.currentNodeId = currentNodeId; @@ -79,11 +80,13 @@ public final class ShardRouting implements Streamable, ToXContent { this.version = version; this.restoreSource = restoreSource; this.unassignedInfo = unassignedInfo; + this.allocationId = allocationId; assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta"; if (!internal) { assert state == ShardRoutingState.UNASSIGNED; assert currentNodeId == null; assert relocatingNodeId == null; + assert allocationId == null; } } @@ -91,7 +94,7 @@ public final class ShardRouting implements Streamable, ToXContent { * Creates a new unassigned shard. */ public static ShardRouting newUnassigned(String index, int shardId, RestoreSource restoreSource, boolean primary, UnassignedInfo unassignedInfo) { - return new ShardRouting(index, shardId, null, null, restoreSource, primary, ShardRoutingState.UNASSIGNED, 0, unassignedInfo, true); + return new ShardRouting(index, shardId, null, null, restoreSource, primary, ShardRoutingState.UNASSIGNED, 0, unassignedInfo, null, true); } /** @@ -201,7 +204,8 @@ public final class ShardRouting implements Streamable, ToXContent { */ public ShardRouting buildTargetRelocatingShard() { assert relocating(); - return new ShardRouting(index, shardId, relocatingNodeId, currentNodeId, restoreSource, primary, ShardRoutingState.INITIALIZING, version, unassignedInfo, true); + return new ShardRouting(index, shardId, relocatingNodeId, currentNodeId, restoreSource, primary, ShardRoutingState.INITIALIZING, version, unassignedInfo, + AllocationId.newTargetRelocation(allocationId), true); } /** @@ -220,6 +224,14 @@ public final class ShardRouting implements Streamable, ToXContent { return unassignedInfo; } + /** + * An id that uniquely identifies an allocation. + */ + @Nullable + public AllocationId allocationId() { + return this.allocationId; + } + /** * Returns true iff this shard is a primary. */ @@ -287,6 +299,9 @@ public final class ShardRouting implements Streamable, ToXContent { if (in.readBoolean()) { unassignedInfo = new UnassignedInfo(in); } + if (in.readBoolean()) { + allocationId = new AllocationId(in); + } freeze(); } @@ -332,6 +347,12 @@ public final class ShardRouting implements Streamable, ToXContent { } else { out.writeBoolean(false); } + if (allocationId != null) { + out.writeBoolean(true); + allocationId.writeTo(out); + } else { + out.writeBoolean(false); + } } @Override @@ -355,6 +376,7 @@ public final class ShardRouting implements Streamable, ToXContent { currentNodeId = null; relocatingNodeId = null; this.unassignedInfo = unassignedInfo; + allocationId = null; } /** @@ -367,6 +389,7 @@ public final class ShardRouting implements Streamable, ToXContent { assert relocatingNodeId == null : this; state = ShardRoutingState.INITIALIZING; currentNodeId = nodeId; + allocationId = AllocationId.newInitializing(); } /** @@ -380,6 +403,7 @@ public final class ShardRouting implements Streamable, ToXContent { assert state == ShardRoutingState.STARTED : this; state = ShardRoutingState.RELOCATING; this.relocatingNodeId = relocatingNodeId; + this.allocationId = AllocationId.newRelocation(allocationId); } /** @@ -395,6 +419,7 @@ public final class ShardRouting implements Streamable, ToXContent { state = ShardRoutingState.STARTED; relocatingNodeId = null; + allocationId = AllocationId.cancelRelocation(allocationId); } /** @@ -405,6 +430,7 @@ public final class ShardRouting implements Streamable, ToXContent { assert state == ShardRoutingState.STARTED; version++; state = ShardRoutingState.INITIALIZING; + allocationId = AllocationId.newInitializing(); } /** @@ -418,8 +444,11 @@ public final class ShardRouting implements Streamable, ToXContent { assert state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING : this; relocatingNodeId = null; restoreSource = null; - state = ShardRoutingState.STARTED; unassignedInfo = null; // we keep the unassigned data until the shard is started + if (state == ShardRoutingState.RELOCATING) { + allocationId = AllocationId.finishRelocation(allocationId); + } + state = ShardRoutingState.STARTED; } /** diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java new file mode 100644 index 00000000000..74a8b2fde46 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java @@ -0,0 +1,124 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import static org.hamcrest.Matchers.*; + +/** + */ +public class AllocationIdTests extends ElasticsearchTestCase { + + @Test + public void testShardToStarted() { + logger.info("-- create unassigned shard"); + ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); + assertThat(shard.allocationId(), nullValue()); + + logger.info("-- initialize the shard"); + shard.initialize("node1"); + AllocationId allocationId = shard.allocationId(); + assertThat(allocationId, notNullValue()); + assertThat(allocationId.getId(), notNullValue()); + assertThat(allocationId.getRelocationId(), nullValue()); + + logger.info("-- start the shard"); + shard.moveToStarted(); + assertThat(shard.allocationId().getId(), equalTo(allocationId.getId())); + allocationId = shard.allocationId(); + assertThat(allocationId.getId(), notNullValue()); + assertThat(allocationId.getRelocationId(), nullValue()); + } + + @Test + public void testSuccessfulRelocation() { + logger.info("-- build started shard"); + ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); + shard.initialize("node1"); + shard.moveToStarted(); + + AllocationId allocationId = shard.allocationId(); + logger.info("-- relocate the shard"); + shard.relocate("node2"); + assertThat(shard.allocationId(), not(equalTo(allocationId))); + assertThat(shard.allocationId().getId(), equalTo(allocationId.getId())); + assertThat(shard.allocationId().getRelocationId(), notNullValue()); + allocationId = shard.allocationId(); + + ShardRouting target = shard.buildTargetRelocatingShard(); + assertThat(target.allocationId().getId(), equalTo(shard.allocationId().getRelocationId())); + assertThat(target.allocationId().getRelocationId(), nullValue()); + + logger.info("-- finalize the relocation"); + shard.moveToStarted(); + assertThat(shard.allocationId().getId(), equalTo(target.allocationId().getId())); + assertThat(shard.allocationId().getRelocationId(), nullValue()); + } + + @Test + public void testCancelRelocation() { + logger.info("-- build started shard"); + ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); + shard.initialize("node1"); + shard.moveToStarted(); + + AllocationId allocationId = shard.allocationId(); + logger.info("-- relocate the shard"); + shard.relocate("node2"); + assertThat(shard.allocationId(), not(equalTo(allocationId))); + assertThat(shard.allocationId().getId(), equalTo(allocationId.getId())); + assertThat(shard.allocationId().getRelocationId(), notNullValue()); + allocationId = shard.allocationId(); + + logger.info("-- cancel relocation"); + shard.cancelRelocation(); + assertThat(shard.allocationId().getId(), equalTo(allocationId.getId())); + assertThat(shard.allocationId().getRelocationId(), nullValue()); + } + + @Test + public void testMoveToUnassigned() { + logger.info("-- build started shard"); + ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); + shard.initialize("node1"); + shard.moveToStarted(); + + logger.info("-- move to unassigned"); + shard.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, null)); + assertThat(shard.allocationId(), nullValue()); + } + + @Test + public void testReinitializing() { + logger.info("-- build started shard"); + ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); + shard.initialize("node1"); + shard.moveToStarted(); + AllocationId allocationId = shard.allocationId(); + + logger.info("-- reinitializing shard"); + shard.reinitializeShard(); + assertThat(shard.allocationId().getId(), notNullValue()); + assertThat(shard.allocationId().getRelocationId(), nullValue()); + assertThat(shard.allocationId().getId(), not(equalTo(allocationId.getId()))); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java b/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java index a0b1ff63969..99d3d8b3a0b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java @@ -26,20 +26,35 @@ package org.elasticsearch.cluster.routing; public class TestShardRouting { public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) { - return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, null, true); + return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, null, buildAllocationId(state), true); } public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, long version) { - return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, null, true); + return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, null, buildAllocationId(state), true); } public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) { - return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, null, true); + return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, null, buildAllocationId(state), true); } public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version, UnassignedInfo unassignedInfo) { - return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, unassignedInfo, true); + return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, unassignedInfo, buildAllocationId(state), true); + } + + private static AllocationId buildAllocationId(ShardRoutingState state) { + switch (state) { + case UNASSIGNED: + return null; + case INITIALIZING: + case STARTED: + return AllocationId.newInitializing(); + case RELOCATING: + AllocationId allocationId = AllocationId.newInitializing(); + return AllocationId.newRelocation(allocationId); + default: + throw new IllegalStateException("illegal state"); + } } }