Snapshot/Restore: restore of indices that are only partially available in the cluster

Fixes the issue with restoring of an index that had only some of its primary shards allocated before it was closed.

Fixes #8224
This commit is contained in:
Igor Motov 2014-11-04 09:56:44 -05:00
parent 6f79d67f81
commit b0dde6ee4a
2 changed files with 108 additions and 37 deletions

View File

@ -159,39 +159,47 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
// check if the counts meets the minimum set // check if the counts meets the minimum set
int requiredAllocation = 1; int requiredAllocation = 1;
try { // if we restore from a repository one copy is more then enough
IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index()); if (shard.restoreSource() == null) {
String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards)); try {
if ("quorum".equals(initialShards)) { IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index());
if (indexMetaData.numberOfReplicas() > 1) { String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards));
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1; if ("quorum".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1;
}
} else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 2) {
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2);
}
} else if ("one".equals(initialShards)) {
requiredAllocation = 1;
} else if ("full".equals(initialShards) || "all".equals(initialShards)) {
requiredAllocation = indexMetaData.numberOfReplicas() + 1;
} else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = indexMetaData.numberOfReplicas();
}
} else {
requiredAllocation = Integer.parseInt(initialShards);
} }
} else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) { } catch (Exception e) {
if (indexMetaData.numberOfReplicas() > 2) { logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard);
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2);
}
} else if ("one".equals(initialShards)) {
requiredAllocation = 1;
} else if ("full".equals(initialShards) || "all".equals(initialShards)) {
requiredAllocation = indexMetaData.numberOfReplicas() + 1;
} else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = indexMetaData.numberOfReplicas();
}
} else {
requiredAllocation = Integer.parseInt(initialShards);
} }
} catch (Exception e) {
logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard);
} }
// not enough found for this shard, continue... // not enough found for this shard, continue...
if (numberOfAllocationsFound < requiredAllocation) { if (numberOfAllocationsFound < requiredAllocation) {
// we can't really allocate, so ignore it and continue // if we are restoring this shard we still can allocate
unassignedIterator.remove(); if (shard.restoreSource() == null) {
routingNodes.ignoredUnassigned().add(shard); // we can't really allocate, so ignore it and continue
if (logger.isDebugEnabled()) { unassignedIterator.remove();
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}], required_number [{}]", shard.index(), shard.id(), numberOfAllocationsFound, requiredAllocation); routingNodes.ignoredUnassigned().add(shard);
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}], required_number [{}]", shard.index(), shard.id(), numberOfAllocationsFound, requiredAllocation);
}
} else if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource());
} }
continue; continue;
} }

View File

@ -19,6 +19,8 @@
package org.elasticsearch.snapshots; package org.elasticsearch.snapshots;
import com.carrotsearch.hppc.IntOpenHashSet;
import com.carrotsearch.hppc.IntSet;
import com.carrotsearch.randomizedtesting.LifecycleScope; import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@ -32,6 +34,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -48,8 +51,10 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.store.support.AbstractIndexStore; import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.snapshots.mockstore.MockRepositoryModule; import org.elasticsearch.snapshots.mockstore.MockRepositoryModule;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Ignore; import org.junit.Ignore;
@ -190,10 +195,10 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
ClusterState clusterState = client.admin().cluster().prepareState().get().getState(); ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
logger.info("Cluster state: {}", clusterState); logger.info("Cluster state: {}", clusterState);
MetaData metaData = clusterState.getMetaData(); MetaData metaData = clusterState.getMetaData();
assertThat(((SnapshottableMetadata)metaData.custom(SnapshottableMetadata.TYPE)).getData(), equalTo("before_snapshot_s")); assertThat(((SnapshottableMetadata) metaData.custom(SnapshottableMetadata.TYPE)).getData(), equalTo("before_snapshot_s"));
assertThat(((NonSnapshottableMetadata)metaData.custom(NonSnapshottableMetadata.TYPE)).getData(), equalTo("after_snapshot_ns")); assertThat(((NonSnapshottableMetadata) metaData.custom(NonSnapshottableMetadata.TYPE)).getData(), equalTo("after_snapshot_ns"));
assertThat(((SnapshottableGatewayMetadata)metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw")); assertThat(((SnapshottableGatewayMetadata) metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw"));
assertThat(((NonSnapshottableGatewayMetadata)metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw")); assertThat(((NonSnapshottableGatewayMetadata) metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw"));
logger.info("--> restart all nodes"); logger.info("--> restart all nodes");
internalCluster().fullRestart(); internalCluster().fullRestart();
@ -205,13 +210,13 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
metaData = clusterState.getMetaData(); metaData = clusterState.getMetaData();
assertThat(metaData.custom(SnapshottableMetadata.TYPE), nullValue()); assertThat(metaData.custom(SnapshottableMetadata.TYPE), nullValue());
assertThat(metaData.custom(NonSnapshottableMetadata.TYPE), nullValue()); assertThat(metaData.custom(NonSnapshottableMetadata.TYPE), nullValue());
assertThat(((SnapshottableGatewayMetadata)metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw")); assertThat(((SnapshottableGatewayMetadata) metaData.custom(SnapshottableGatewayMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw"));
assertThat(((NonSnapshottableGatewayMetadata)metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw")); assertThat(((NonSnapshottableGatewayMetadata) metaData.custom(NonSnapshottableGatewayMetadata.TYPE)).getData(), equalTo("after_snapshot_ns_gw"));
// Shouldn't be returned as part of API response // Shouldn't be returned as part of API response
assertThat(metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE), nullValue()); assertThat(metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE), nullValue());
// But should still be in state // But should still be in state
metaData = internalCluster().getInstance(ClusterService.class).state().metaData(); metaData = internalCluster().getInstance(ClusterService.class).state().metaData();
assertThat(((SnapshotableGatewayNoApiMetadata)metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw_noapi")); assertThat(((SnapshotableGatewayNoApiMetadata) metaData.custom(SnapshotableGatewayNoApiMetadata.TYPE)).getData(), equalTo("before_snapshot_s_gw_noapi"));
} }
private void updateClusterState(final ClusterStateUpdater updater) throws InterruptedException { private void updateClusterState(final ClusterStateUpdater updater) throws InterruptedException {
@ -237,7 +242,7 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
} }
private static interface ClusterStateUpdater { private static interface ClusterStateUpdater {
public ClusterState execute(ClusterState currentState) throws Exception; public ClusterState execute(ClusterState currentState) throws Exception;
} }
@Test @Test
@ -489,6 +494,64 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
assertThat(client().prepareCount("test-idx-some").get().getCount(), allOf(greaterThan(0L), lessThan(100L))); assertThat(client().prepareCount("test-idx-some").get().getCount(), allOf(greaterThan(0L), lessThan(100L)));
} }
@Test
@TestLogging("indices.recovery:TRACE,index.gateway:TRACE,gateway:TRACE")
public void restoreIndexWithShardsMissingInLocalGateway() throws Exception {
logger.info("--> start 2 nodes");
//NO COMMIT: remove HTTP_ENABLED
internalCluster().startNode(settingsBuilder().put("gateway.type", "local").put(InternalNode.HTTP_ENABLED, true));
internalCluster().startNode(settingsBuilder().put("gateway.type", "local").put(InternalNode.HTTP_ENABLED, true));
cluster().wipeIndices("_all");
logger.info("--> create repository");
PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", newTempDir())).execute().actionGet();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
int numberOfShards = 6;
logger.info("--> create an index that will have some unallocated shards");
assertAcked(prepareCreate("test-idx", 2, settingsBuilder().put("number_of_shards", numberOfShards)
.put("number_of_replicas", 0)));
ensureGreen();
logger.info("--> indexing some data into test-idx");
for (int i = 0; i < 100; i++) {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(client().prepareCount("test-idx").get().getCount(), equalTo(100L));
logger.info("--> start snapshot");
assertThat(client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setIndices("test-idx").setWaitForCompletion(true).get().getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> close the index");
assertAcked(client().admin().indices().prepareClose("test-idx"));
logger.info("--> shutdown one of the nodes that should make half of the shards unavailable");
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public boolean clearData(String nodeName) {
return true;
}
});
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForNodes("2").execute().actionGet().isTimedOut(), equalTo(false));
logger.info("--> restore index snapshot");
assertThat(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setRestoreGlobalState(false).setWaitForCompletion(true).get().getRestoreInfo().successfulShards(), equalTo(6));
ensureGreen("test-idx");
assertThat(client().prepareCount("test-idx").get().getCount(), equalTo(100L));
IntSet reusedShards = IntOpenHashSet.newInstance();
for (ShardRecoveryResponse response : client().admin().indices().prepareRecoveries("test-idx").get().shardResponses().get("test-idx")) {
if (response.recoveryState().getIndex().reusedByteCount() > 0) {
reusedShards.add(response.getShardId());
}
}
logger.info("--> check that at least half of the shards had some reuse: [{}]", reusedShards);
assertThat(reusedShards.size(), greaterThanOrEqualTo(numberOfShards/2));
}
@Test @Test
@TestLogging("snapshots:TRACE,repositories:TRACE") @TestLogging("snapshots:TRACE,repositories:TRACE")
@Ignore @Ignore
@ -662,7 +725,7 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
@Override @Override
public T readFrom(StreamInput in) throws IOException { public T readFrom(StreamInput in) throws IOException {
return (T)newTestCustomMetaData(in.readString()); return (T) newTestCustomMetaData(in.readString());
} }
@Override @Override
@ -692,7 +755,7 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
if (data == null) { if (data == null) {
throw new ElasticsearchParseException("failed to parse snapshottable metadata, data not found"); throw new ElasticsearchParseException("failed to parse snapshottable metadata, data not found");
} }
return (T)newTestCustomMetaData(data); return (T) newTestCustomMetaData(data);
} }
@Override @Override