diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java index 8eefed977f0..1fef62e3562 100644 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java +++ b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java @@ -159,39 +159,47 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA // check if the counts meets the minimum set int requiredAllocation = 1; - try { - IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index()); - String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards)); - if ("quorum".equals(initialShards)) { - if (indexMetaData.numberOfReplicas() > 1) { - requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1; + // if we restore from a repository one copy is more then enough + if (shard.restoreSource() == null) { + try { + IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index()); + String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards)); + if ("quorum".equals(initialShards)) { + if (indexMetaData.numberOfReplicas() > 1) { + requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1; + } + } else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) { + if (indexMetaData.numberOfReplicas() > 2) { + requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2); + } + } else if ("one".equals(initialShards)) { + requiredAllocation = 1; + } else if ("full".equals(initialShards) || "all".equals(initialShards)) { + requiredAllocation = indexMetaData.numberOfReplicas() + 1; + } else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) { + if (indexMetaData.numberOfReplicas() > 1) { + requiredAllocation = indexMetaData.numberOfReplicas(); + } + } else { + requiredAllocation = Integer.parseInt(initialShards); } - } else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) { - if (indexMetaData.numberOfReplicas() > 2) { - requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2); - } - } else if ("one".equals(initialShards)) { - requiredAllocation = 1; - } else if ("full".equals(initialShards) || "all".equals(initialShards)) { - requiredAllocation = indexMetaData.numberOfReplicas() + 1; - } else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) { - if (indexMetaData.numberOfReplicas() > 1) { - requiredAllocation = indexMetaData.numberOfReplicas(); - } - } else { - requiredAllocation = Integer.parseInt(initialShards); + } catch (Exception e) { + logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard); } - } catch (Exception e) { - logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard); } // not enough found for this shard, continue... if (numberOfAllocationsFound < requiredAllocation) { - // we can't really allocate, so ignore it and continue - unassignedIterator.remove(); - routingNodes.ignoredUnassigned().add(shard); - if (logger.isDebugEnabled()) { - logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}], required_number [{}]", shard.index(), shard.id(), numberOfAllocationsFound, requiredAllocation); + // if we are restoring this shard we still can allocate + if (shard.restoreSource() == null) { + // we can't really allocate, so ignore it and continue + unassignedIterator.remove(); + routingNodes.ignoredUnassigned().add(shard); + if (logger.isDebugEnabled()) { + logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}], required_number [{}]", shard.index(), shard.id(), numberOfAllocationsFound, requiredAllocation); + } + } else if (logger.isDebugEnabled()) { + logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource()); } continue; } diff --git a/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java index 40d2ae9b090..6b94c0668e6 100644 --- a/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.snapshots; +import com.carrotsearch.hppc.IntOpenHashSet; +import com.carrotsearch.hppc.IntSet; import com.carrotsearch.randomizedtesting.LifecycleScope; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; @@ -32,6 +34,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -48,8 +51,10 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.store.support.AbstractIndexStore; +import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.snapshots.mockstore.MockRepositoryModule; +import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; import org.junit.Ignore; @@ -190,10 +195,10 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests ClusterState clusterState = client.admin().cluster().prepareState().get().getState(); logger.info("Cluster state: {}", clusterState); MetaData metaData = clusterState.getMetaData(); - assertThat(((SnapshottableMetadata)metaData.custom(SnapshottableMetadata.TYPE)).getData(), equalTo("before_snapshot_s")); - assertThat(((NonSnapshottableMetadata)metaData.custom(NonSnapshottableMetadata.TYPE)).getData(), equalTo("after_snapshot_ns")); - assertThat(((SnapshottableGatewayMetadata)metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw")); - assertThat(((NonSnapshottableGatewayMetadata)metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw")); + assertThat(((SnapshottableMetadata) metaData.custom(SnapshottableMetadata.TYPE)).getData(), equalTo("before_snapshot_s")); + assertThat(((NonSnapshottableMetadata) metaData.custom(NonSnapshottableMetadata.TYPE)).getData(), equalTo("after_snapshot_ns")); + assertThat(((SnapshottableGatewayMetadata) metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw")); + assertThat(((NonSnapshottableGatewayMetadata) metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw")); logger.info("--> restart all nodes"); internalCluster().fullRestart(); @@ -205,13 +210,13 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests metaData = clusterState.getMetaData(); assertThat(metaData.custom(SnapshottableMetadata.TYPE), nullValue()); assertThat(metaData.custom(NonSnapshottableMetadata.TYPE), nullValue()); - assertThat(((SnapshottableGatewayMetadata)metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw")); - assertThat(((NonSnapshottableGatewayMetadata)metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw")); + assertThat(((SnapshottableGatewayMetadata) metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw")); + assertThat(((NonSnapshottableGatewayMetadata) metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw")); // Shouldn't be returned as part of API response assertThat(metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE), nullValue()); // But should still be in state metaData = internalCluster().getInstance(ClusterService.class).state().metaData(); - assertThat(((SnapshotableGatewayNoApiMetadata)metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw_noapi")); + assertThat(((SnapshotableGatewayNoApiMetadata) metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw_noapi")); } private void updateClusterState(final ClusterStateUpdater updater) throws InterruptedException { @@ -237,7 +242,7 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests } private static interface ClusterStateUpdater { - public ClusterState execute(ClusterState currentState) throws Exception; + public ClusterState execute(ClusterState currentState) throws Exception; } @Test @@ -489,6 +494,64 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests assertThat(client().prepareCount("test-idx-some").get().getCount(), allOf(greaterThan(0L), lessThan(100L))); } + @Test + @TestLogging("indices.recovery:TRACE,index.gateway:TRACE,gateway:TRACE") + public void restoreIndexWithShardsMissingInLocalGateway() throws Exception { + logger.info("--> start 2 nodes"); + //NO COMMIT: remove HTTP_ENABLED + internalCluster().startNode(settingsBuilder().put("gateway.type", "local").put(InternalNode.HTTP_ENABLED, true)); + internalCluster().startNode(settingsBuilder().put("gateway.type", "local").put(InternalNode.HTTP_ENABLED, true)); + cluster().wipeIndices("_all"); + + logger.info("--> create repository"); + PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", newTempDir())).execute().actionGet(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + int numberOfShards = 6; + logger.info("--> create an index that will have some unallocated shards"); + assertAcked(prepareCreate("test-idx", 2, settingsBuilder().put("number_of_shards", numberOfShards) + .put("number_of_replicas", 0))); + ensureGreen(); + + logger.info("--> indexing some data into test-idx"); + for (int i = 0; i < 100; i++) { + index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client().prepareCount("test-idx").get().getCount(), equalTo(100L)); + + logger.info("--> start snapshot"); + assertThat(client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setIndices("test-idx").setWaitForCompletion(true).get().getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + + logger.info("--> close the index"); + assertAcked(client().admin().indices().prepareClose("test-idx")); + + logger.info("--> shutdown one of the nodes that should make half of the shards unavailable"); + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public boolean clearData(String nodeName) { + return true; + } + }); + + assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForNodes("2").execute().actionGet().isTimedOut(), equalTo(false)); + + logger.info("--> restore index snapshot"); + assertThat(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setRestoreGlobalState(false).setWaitForCompletion(true).get().getRestoreInfo().successfulShards(), equalTo(6)); + + ensureGreen("test-idx"); + assertThat(client().prepareCount("test-idx").get().getCount(), equalTo(100L)); + + IntSet reusedShards = IntOpenHashSet.newInstance(); + for (ShardRecoveryResponse response : client().admin().indices().prepareRecoveries("test-idx").get().shardResponses().get("test-idx")) { + if (response.recoveryState().getIndex().reusedByteCount() > 0) { + reusedShards.add(response.getShardId()); + } + } + logger.info("--> check that at least half of the shards had some reuse: [{}]", reusedShards); + assertThat(reusedShards.size(), greaterThanOrEqualTo(numberOfShards/2)); + } + @Test @TestLogging("snapshots:TRACE,repositories:TRACE") @Ignore @@ -662,7 +725,7 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests @Override public T readFrom(StreamInput in) throws IOException { - return (T)newTestCustomMetaData(in.readString()); + return (T) newTestCustomMetaData(in.readString()); } @Override @@ -692,7 +755,7 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests if (data == null) { throw new ElasticsearchParseException("failed to parse snapshottable metadata, data not found"); } - return (T)newTestCustomMetaData(data); + return (T) newTestCustomMetaData(data); } @Override