diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java b/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java new file mode 100644 index 00000000000..55e7ca729b7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java @@ -0,0 +1,129 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Uniquely identifies an allocation. An allocation is a shard moving from unassigned to initializing, + * or relocation. + *
+ * Relocation is a special case, where the origin shard is relocating with a relocationId and same id, and + * the target shard (only materialized in RoutingNodes) is initializing with the id set to the origin shard + * relocationId. Once relocation is done, the new allocation id is set to the relocationId. This is similar + * behavior to how ShardRouting#currentNodeId is used. + */ +public class AllocationId { + + private final String id; + private final String relocationId; + + AllocationId(StreamInput in) throws IOException { + this.id = in.readString(); + this.relocationId = in.readOptionalString(); + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.id); + out.writeOptionalString(this.relocationId); + } + + private AllocationId(String id, String relocationId) { + this.id = id; + this.relocationId = relocationId; + } + + /** + * Creates a new allocation id for initializing allocation. + */ + public static AllocationId newInitializing() { + return new AllocationId(Strings.randomBase64UUID(), null); + } + + /** + * Creates a new allocation id for the target initializing shard that is the result + * of a relocation. + */ + public static AllocationId newTargetRelocation(AllocationId allocationId) { + assert allocationId.getRelocationId() != null; + return new AllocationId(allocationId.getRelocationId(), null); + } + + /** + * Creates a new allocation id for a shard that moves to be relocated, populating + * the transient holder for relocationId. + */ + public static AllocationId newRelocation(AllocationId allocationId) { + assert allocationId.getRelocationId() == null; + return new AllocationId(allocationId.getId(), Strings.randomBase64UUID()); + } + + /** + * Creates a new allocation id representing a cancelled relocation. + */ + public static AllocationId cancelRelocation(AllocationId allocationId) { + assert allocationId.getRelocationId() != null; + return new AllocationId(allocationId.getId(), null); + } + + /** + * Creates a new allocation id finalizing a relocation, moving the transient + * relocation id to be the actual id. + */ + public static AllocationId finishRelocation(AllocationId allocationId) { + assert allocationId.getRelocationId() != null; + return new AllocationId(allocationId.getRelocationId(), null); + } + + /** + * The allocation id uniquely identifying an allocation, note, if it is relocation + * the {@link #getRelocationId()} need to be taken into account as well. + */ + public String getId() { + return id; + } + + /** + * The transient relocation id holding the unique id that is used for relocation. + */ + public String getRelocationId() { + return relocationId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + AllocationId that = (AllocationId) o; + if (!id.equals(that.id)) return false; + return !(relocationId != null ? !relocationId.equals(that.relocationId) : that.relocationId != null); + + } + + @Override + public int hashCode() { + int result = id.hashCode(); + result = 31 * result + (relocationId != null ? relocationId.hashCode() : 0); + return result; + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 37b1795ebfa..7165fda8a4d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -44,10 +44,11 @@ public final class ShardRouting implements Streamable, ToXContent { private boolean primary; private ShardRoutingState state; private long version; - private transient ShardId shardIdentifier; private RestoreSource restoreSource; private UnassignedInfo unassignedInfo; + private AllocationId allocationId; private final transient Listtrue
iff this shard is a primary.
*/
@@ -287,6 +299,9 @@ public final class ShardRouting implements Streamable, ToXContent {
if (in.readBoolean()) {
unassignedInfo = new UnassignedInfo(in);
}
+ if (in.readBoolean()) {
+ allocationId = new AllocationId(in);
+ }
freeze();
}
@@ -332,6 +347,12 @@ public final class ShardRouting implements Streamable, ToXContent {
} else {
out.writeBoolean(false);
}
+ if (allocationId != null) {
+ out.writeBoolean(true);
+ allocationId.writeTo(out);
+ } else {
+ out.writeBoolean(false);
+ }
}
@Override
@@ -355,6 +376,7 @@ public final class ShardRouting implements Streamable, ToXContent {
currentNodeId = null;
relocatingNodeId = null;
this.unassignedInfo = unassignedInfo;
+ allocationId = null;
}
/**
@@ -367,6 +389,7 @@ public final class ShardRouting implements Streamable, ToXContent {
assert relocatingNodeId == null : this;
state = ShardRoutingState.INITIALIZING;
currentNodeId = nodeId;
+ allocationId = AllocationId.newInitializing();
}
/**
@@ -380,6 +403,7 @@ public final class ShardRouting implements Streamable, ToXContent {
assert state == ShardRoutingState.STARTED : this;
state = ShardRoutingState.RELOCATING;
this.relocatingNodeId = relocatingNodeId;
+ this.allocationId = AllocationId.newRelocation(allocationId);
}
/**
@@ -395,6 +419,7 @@ public final class ShardRouting implements Streamable, ToXContent {
state = ShardRoutingState.STARTED;
relocatingNodeId = null;
+ allocationId = AllocationId.cancelRelocation(allocationId);
}
/**
@@ -405,6 +430,7 @@ public final class ShardRouting implements Streamable, ToXContent {
assert state == ShardRoutingState.STARTED;
version++;
state = ShardRoutingState.INITIALIZING;
+ allocationId = AllocationId.newInitializing();
}
/**
@@ -418,8 +444,11 @@ public final class ShardRouting implements Streamable, ToXContent {
assert state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING : this;
relocatingNodeId = null;
restoreSource = null;
- state = ShardRoutingState.STARTED;
unassignedInfo = null; // we keep the unassigned data until the shard is started
+ if (state == ShardRoutingState.RELOCATING) {
+ allocationId = AllocationId.finishRelocation(allocationId);
+ }
+ state = ShardRoutingState.STARTED;
}
/**
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java
new file mode 100644
index 00000000000..74a8b2fde46
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing;
+
+import org.elasticsearch.test.ElasticsearchTestCase;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.*;
+
+/**
+ */
+public class AllocationIdTests extends ElasticsearchTestCase {
+
+ @Test
+ public void testShardToStarted() {
+ logger.info("-- create unassigned shard");
+ ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
+ assertThat(shard.allocationId(), nullValue());
+
+ logger.info("-- initialize the shard");
+ shard.initialize("node1");
+ AllocationId allocationId = shard.allocationId();
+ assertThat(allocationId, notNullValue());
+ assertThat(allocationId.getId(), notNullValue());
+ assertThat(allocationId.getRelocationId(), nullValue());
+
+ logger.info("-- start the shard");
+ shard.moveToStarted();
+ assertThat(shard.allocationId().getId(), equalTo(allocationId.getId()));
+ allocationId = shard.allocationId();
+ assertThat(allocationId.getId(), notNullValue());
+ assertThat(allocationId.getRelocationId(), nullValue());
+ }
+
+ @Test
+ public void testSuccessfulRelocation() {
+ logger.info("-- build started shard");
+ ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
+ shard.initialize("node1");
+ shard.moveToStarted();
+
+ AllocationId allocationId = shard.allocationId();
+ logger.info("-- relocate the shard");
+ shard.relocate("node2");
+ assertThat(shard.allocationId(), not(equalTo(allocationId)));
+ assertThat(shard.allocationId().getId(), equalTo(allocationId.getId()));
+ assertThat(shard.allocationId().getRelocationId(), notNullValue());
+ allocationId = shard.allocationId();
+
+ ShardRouting target = shard.buildTargetRelocatingShard();
+ assertThat(target.allocationId().getId(), equalTo(shard.allocationId().getRelocationId()));
+ assertThat(target.allocationId().getRelocationId(), nullValue());
+
+ logger.info("-- finalize the relocation");
+ shard.moveToStarted();
+ assertThat(shard.allocationId().getId(), equalTo(target.allocationId().getId()));
+ assertThat(shard.allocationId().getRelocationId(), nullValue());
+ }
+
+ @Test
+ public void testCancelRelocation() {
+ logger.info("-- build started shard");
+ ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
+ shard.initialize("node1");
+ shard.moveToStarted();
+
+ AllocationId allocationId = shard.allocationId();
+ logger.info("-- relocate the shard");
+ shard.relocate("node2");
+ assertThat(shard.allocationId(), not(equalTo(allocationId)));
+ assertThat(shard.allocationId().getId(), equalTo(allocationId.getId()));
+ assertThat(shard.allocationId().getRelocationId(), notNullValue());
+ allocationId = shard.allocationId();
+
+ logger.info("-- cancel relocation");
+ shard.cancelRelocation();
+ assertThat(shard.allocationId().getId(), equalTo(allocationId.getId()));
+ assertThat(shard.allocationId().getRelocationId(), nullValue());
+ }
+
+ @Test
+ public void testMoveToUnassigned() {
+ logger.info("-- build started shard");
+ ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
+ shard.initialize("node1");
+ shard.moveToStarted();
+
+ logger.info("-- move to unassigned");
+ shard.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, null));
+ assertThat(shard.allocationId(), nullValue());
+ }
+
+ @Test
+ public void testReinitializing() {
+ logger.info("-- build started shard");
+ ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
+ shard.initialize("node1");
+ shard.moveToStarted();
+ AllocationId allocationId = shard.allocationId();
+
+ logger.info("-- reinitializing shard");
+ shard.reinitializeShard();
+ assertThat(shard.allocationId().getId(), notNullValue());
+ assertThat(shard.allocationId().getRelocationId(), nullValue());
+ assertThat(shard.allocationId().getId(), not(equalTo(allocationId.getId())));
+ }
+}
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java b/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java
index a0b1ff63969..99d3d8b3a0b 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java
@@ -26,20 +26,35 @@ package org.elasticsearch.cluster.routing;
public class TestShardRouting {
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) {
- return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, null, true);
+ return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, null, buildAllocationId(state), true);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, long version) {
- return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, null, true);
+ return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, null, buildAllocationId(state), true);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) {
- return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, null, true);
+ return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, null, buildAllocationId(state), true);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId,
String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version,
UnassignedInfo unassignedInfo) {
- return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, unassignedInfo, true);
+ return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, unassignedInfo, buildAllocationId(state), true);
+ }
+
+ private static AllocationId buildAllocationId(ShardRoutingState state) {
+ switch (state) {
+ case UNASSIGNED:
+ return null;
+ case INITIALIZING:
+ case STARTED:
+ return AllocationId.newInitializing();
+ case RELOCATING:
+ AllocationId allocationId = AllocationId.newInitializing();
+ return AllocationId.newRelocation(allocationId);
+ default:
+ throw new IllegalStateException("illegal state");
+ }
}
}