diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 67f4d71bd4e..f1f6f8aee22 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -41,8 +41,6 @@ import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.logging.DeprecationLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -62,7 +60,6 @@ import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import java.io.IOException; -import java.text.ParseException; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; @@ -440,12 +437,14 @@ public class IndexMetaData implements Diffable, ToXContent { return mappings.get(mappingType); } - public static final Setting INDEX_SHRINK_SOURCE_UUID = Setting.simpleString("index.shrink.source.uuid"); - public static final Setting INDEX_SHRINK_SOURCE_NAME = Setting.simpleString("index.shrink.source.name"); + public static final String INDEX_SHRINK_SOURCE_UUID_KEY = "index.shrink.source.uuid"; + public static final String INDEX_SHRINK_SOURCE_NAME_KEY = "index.shrink.source.name"; + public static final Setting INDEX_SHRINK_SOURCE_UUID = Setting.simpleString(INDEX_SHRINK_SOURCE_UUID_KEY); + public static final Setting INDEX_SHRINK_SOURCE_NAME = Setting.simpleString(INDEX_SHRINK_SOURCE_NAME_KEY); public Index getMergeSourceIndex() { - return INDEX_SHRINK_SOURCE_UUID.exists(settings) ? new Index(INDEX_SHRINK_SOURCE_NAME.get(settings), INDEX_SHRINK_SOURCE_UUID.get(settings)) : null; + return INDEX_SHRINK_SOURCE_UUID.exists(settings) ? new Index(INDEX_SHRINK_SOURCE_NAME.get(settings), INDEX_SHRINK_SOURCE_UUID.get(settings)) : null; } /** diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index a3292e2cfd4..fc9ee533090 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -598,7 +598,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { indexSettingsBuilder // we use "i.r.a.initial_recovery" rather than "i.r.a.require|include" since we want the replica to allocate right away // once we are allocated. - .put("index.routing.allocation.initial_recovery._id", + .put(IndexMetaData.INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING.getKey() + "_id", Strings.arrayToCommaDelimitedString(nodesToAllocateOn.toArray())) // we only try once and then give up with a shrink index .put("index.allocation.max_retries", 1) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index d80a1c326cf..c587629ef0c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -139,8 +139,8 @@ public class IndexRoutingTable extends AbstractDiffable imple "allocation set " + inSyncAllocationIds); } - if (shardRouting.primary() && shardRouting.initializing() && shardRouting.relocating() == false && - RecoverySource.isInitialRecovery(shardRouting.recoverySource().getType()) == false && + if (shardRouting.primary() && shardRouting.initializing() && + shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE && inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) throw new IllegalStateException("a primary shard routing " + shardRouting + " is a primary that is recovering from " + "a known allocation id but has no corresponding entry in the in-sync " + diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java b/core/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java index f613cdbbada..32afad99f27 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java @@ -20,7 +20,6 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.Version; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -29,7 +28,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.snapshots.Snapshot; import java.io.IOException; -import java.util.EnumSet; import java.util.Objects; /** @@ -249,14 +247,4 @@ public abstract class RecoverySource implements Writeable, ToXContent { return "peer recovery"; } } - - private static EnumSet INITIAL_RECOVERY_TYPES = EnumSet.of(Type.EMPTY_STORE, Type.LOCAL_SHARDS, Type.SNAPSHOT); - - /** - * returns true for recovery types that indicate that a primary is being allocated for the very first time. - * This recoveries can be controlled by {@link IndexMetaData#INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING} - */ - public static boolean isInitialRecovery(RecoverySource.Type type) { - return INITIAL_RECOVERY_TYPES.contains(type); - } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java index 85069392eb6..933b0a829d5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java @@ -30,6 +30,8 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import java.util.EnumSet; + import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR; @@ -75,6 +77,17 @@ public class FilterAllocationDecider extends AllocationDecider { public static final Setting CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING = Setting.groupSetting(CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX + ".", IP_VALIDATOR, Property.Dynamic, Property.NodeScope); + /** + * The set of {@link RecoverySource.Type} values for which the + * {@link IndexMetaData#INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING} should apply. + * Note that we do not include the {@link RecoverySource.Type#SNAPSHOT} type here + * because if the snapshot is restored to a different cluster that does not contain + * the initial recovery node id, or to the same cluster where the initial recovery node + * id has been decommissioned, then the primary shards will never be allocated. + */ + static EnumSet INITIAL_RECOVERY_TYPES = + EnumSet.of(RecoverySource.Type.EMPTY_STORE, RecoverySource.Type.LOCAL_SHARDS); + private volatile DiscoveryNodeFilters clusterRequireFilters; private volatile DiscoveryNodeFilters clusterIncludeFilters; private volatile DiscoveryNodeFilters clusterExcludeFilters; @@ -98,7 +111,7 @@ public class FilterAllocationDecider extends AllocationDecider { IndexMetaData indexMd = allocation.metaData().getIndexSafe(shardRouting.index()); DiscoveryNodeFilters initialRecoveryFilters = indexMd.getInitialRecoveryFilters(); if (initialRecoveryFilters != null && - RecoverySource.isInitialRecovery(shardRouting.recoverySource().getType()) && + INITIAL_RECOVERY_TYPES.contains(shardRouting.recoverySource().getType()) && initialRecoveryFilters.match(node.node()) == false) { String explanation = (shardRouting.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) ? "initial allocation of the shrunken index is only allowed on nodes [%s] that hold a copy of every shard in the index" : diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 4094d69edde..1568ac288d9 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -191,9 +191,11 @@ public final class IndexScopedSettings extends AbstractScopedSettings { case IndexMetaData.SETTING_VERSION_UPGRADED: case IndexMetaData.SETTING_INDEX_PROVIDED_NAME: case MergePolicyConfig.INDEX_MERGE_ENABLED: + case IndexMetaData.INDEX_SHRINK_SOURCE_UUID_KEY: + case IndexMetaData.INDEX_SHRINK_SOURCE_NAME_KEY: return true; default: - return false; + return IndexMetaData.INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING.getRawKey().match(key); } } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java index e26fece7c6d..a76ad4657c6 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java @@ -352,8 +352,7 @@ public class RoutingTableTests extends ESAllocationTestCase { Set insyncAids = shardTable.activeShards().stream().map( shr -> shr.allocationId().getId()).collect(Collectors.toSet()); final ShardRouting primaryShard = shardTable.primaryShard(); - if (primaryShard.initializing() && primaryShard.relocating() == false && - RecoverySource.isInitialRecovery(primaryShard.recoverySource().getType()) == false ) { + if (primaryShard.initializing() && shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE) { // simulate a primary was initialized based on aid insyncAids.add(primaryShard.allocationId().getId()); } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java similarity index 89% rename from core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java rename to core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java index c201736c51c..c857a3f30ed 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.cluster.routing.allocation; +package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; @@ -29,19 +29,14 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; -import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; -import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type; -import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; -import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; -import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.snapshots.Snapshot; -import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.gateway.TestGatewayAllocator; import java.util.Arrays; @@ -144,8 +139,7 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase { } private ClusterState createInitialClusterState(AllocationService service, Settings settings) { - RecoverySource.Type recoveryType = randomFrom(RecoverySource.Type.EMPTY_STORE, - RecoverySource.Type.LOCAL_SHARDS, RecoverySource.Type.SNAPSHOT); + RecoverySource.Type recoveryType = randomFrom(FilterAllocationDecider.INITIAL_RECOVERY_TYPES); MetaData.Builder metaData = MetaData.builder(); final Settings.Builder indexSettings = settings(Version.CURRENT).put(settings); final IndexMetaData sourceIndex; @@ -164,9 +158,6 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase { } final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder("idx").settings(indexSettings) .numberOfShards(1).numberOfReplicas(1); - if (recoveryType == RecoverySource.Type.SNAPSHOT) { - indexMetaDataBuilder.putInSyncAllocationIds(0, Collections.singleton("_snapshot_restore")); - } final IndexMetaData indexMetaData = indexMetaDataBuilder.build(); metaData.put(indexMetaData, false); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); @@ -174,11 +165,6 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase { case EMPTY_STORE: routingTableBuilder.addAsNew(indexMetaData); break; - case SNAPSHOT: - routingTableBuilder.addAsRestore(indexMetaData, new RecoverySource.SnapshotRecoverySource( - new Snapshot("repository", new SnapshotId("snapshot_name", "snapshot_uuid")), - Version.CURRENT, indexMetaData.getIndex().getName())); - break; case LOCAL_SHARDS: routingTableBuilder.addAsFromCloseToOpen(sourceIndex); routingTableBuilder.addAsNew(indexMetaData); @@ -192,7 +178,7 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase { .getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) .build(); - return service.reroute(clusterState, "reroute", false); + return service.reroute(clusterState, "reroute"); } public void testInvalidIPFilter() { diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 08a3308172b..10c49b431a5 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -54,6 +54,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.discovery.zen.ElectMasterService; +import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; @@ -127,7 +128,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest NonSnapshottableGatewayMetadata::readDiffFrom, NonSnapshottableGatewayMetadata::fromXContent); registerMetaDataCustom(SnapshotableGatewayNoApiMetadata.TYPE, SnapshotableGatewayNoApiMetadata::readFrom, NonSnapshottableGatewayMetadata::readDiffFrom, SnapshotableGatewayNoApiMetadata::fromXContent); - } @Override @@ -154,8 +154,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest logger.info("--> wait for the second node to join the cluster"); assertThat(client.admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut(), equalTo(false)); - int random = randomIntBetween(10, 42); - logger.info("--> set test persistent setting"); client.admin().cluster().prepareUpdateSettings().setPersistentSettings( Settings.builder() @@ -723,7 +721,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest if (clusterStateError.get() != null) { throw clusterStateError.get(); } - } public void testMasterShutdownDuringSnapshot() throws Exception { @@ -801,33 +798,72 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest assertEquals(0, snapshotInfo.failedShards()); } + /** + * Tests that a shrunken index (created via the shrink APIs) and subsequently snapshotted + * can be restored when the node the shrunken index was created on is no longer part of + * the cluster. + */ + public void testRestoreShrinkIndex() throws Exception { + logger.info("--> starting a master node and a data node"); + internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNode(); - private boolean snapshotIsDone(String repository, String snapshot) { - try { - SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus(repository).setSnapshots(snapshot).get(); - if (snapshotsStatusResponse.getSnapshots().isEmpty()) { - return false; - } - for (SnapshotStatus snapshotStatus : snapshotsStatusResponse.getSnapshots()) { - if (snapshotStatus.getState().completed()) { - return true; - } - } - return false; - } catch (SnapshotMissingException ex) { - return false; + final Client client = client(); + final String repo = "test-repo"; + final String snapshot = "test-snap"; + final String sourceIdx = "test-idx"; + final String shrunkIdx = "test-idx-shrunk"; + + logger.info("--> creating repository"); + assertAcked(client.admin().cluster().preparePutRepository(repo).setType("fs") + .setSettings(Settings.builder().put("location", randomRepoPath()) + .put("compress", randomBoolean()))); + + assertAcked(prepareCreate(sourceIdx, 0, Settings.builder() + .put("number_of_shards", between(1, 20)).put("number_of_replicas", 0))); + ensureGreen(); + + logger.info("--> indexing some data"); + IndexRequestBuilder[] builders = new IndexRequestBuilder[randomIntBetween(10, 100)]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex(sourceIdx, "type1", + Integer.toString(i)).setSource("field1", "bar " + i); } - } + indexRandom(true, builders); + flushAndRefresh(); - private void createTestIndex(String name) { - assertAcked(prepareCreate(name, 0, Settings.builder().put("number_of_shards", between(1, 6)) - .put("number_of_replicas", between(1, 6)))); + logger.info("--> shrink the index"); + assertAcked(client.admin().indices().prepareUpdateSettings(sourceIdx) + .setSettings(Settings.builder().put("index.blocks.write", true)).get()); + assertAcked(client.admin().indices().prepareShrinkIndex(sourceIdx, shrunkIdx).get()); - logger.info("--> indexing some data into {}", name); - for (int i = 0; i < between(10, 500); i++) { - index(name, "doc", Integer.toString(i), "foo", "bar" + i); - } + logger.info("--> snapshot the shrunk index"); + CreateSnapshotResponse createResponse = client.admin().cluster() + .prepareCreateSnapshot(repo, snapshot) + .setWaitForCompletion(true).setIndices(shrunkIdx).get(); + assertEquals(SnapshotState.SUCCESS, createResponse.getSnapshotInfo().state()); + logger.info("--> delete index and stop the data node"); + assertAcked(client.admin().indices().prepareDelete(sourceIdx).get()); + assertAcked(client.admin().indices().prepareDelete(shrunkIdx).get()); + internalCluster().stopRandomDataNode(); + client().admin().cluster().prepareHealth().setTimeout("30s").setWaitForNodes("1"); + + logger.info("--> start a new data node"); + final Settings dataSettings = Settings.builder() + .put(Node.NODE_NAME_SETTING.getKey(), randomAlphaOfLength(5)) + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) // to get a new node id + .build(); + internalCluster().startDataOnlyNode(dataSettings); + client().admin().cluster().prepareHealth().setTimeout("30s").setWaitForNodes("2"); + + logger.info("--> restore the shrunk index and ensure all shards are allocated"); + RestoreSnapshotResponse restoreResponse = client().admin().cluster() + .prepareRestoreSnapshot(repo, snapshot).setWaitForCompletion(true) + .setIndices(shrunkIdx).get(); + assertEquals(restoreResponse.getRestoreInfo().totalShards(), + restoreResponse.getRestoreInfo().successfulShards()); + ensureYellow(); } public static class SnapshottableMetadata extends TestCustomMetaData {