Fixes restore of a shrunken index when initial recovery node is gone (#24322)
When an index is shrunk using the shrink APIs, the shrink operation adds some internal index settings to the shrink index, for example `index.shrink.source.name|uuid` to denote the source index, as well as `index.routing.allocation.initial_recovery._id` to denote the node on which all shards for the source index resided when the shrunken index was created. However, this presents a problem when taking a snapshot of the shrunken index and restoring it to a cluster where the initial recovery node is not present, or restoring to the same cluster where the initial recovery node is offline or decomissioned. The restore operation fails to allocate the shard in the shrunken index to a node when the initial recovery node is not present, and a restore type of recovery will *not* go through the PrimaryShardAllocator, meaning that it will not have the chance to force allocate the primary to a node in the cluster. Rather, restore initiated shard allocation goes through the BalancedShardAllocator which does not attempt to force allocate a primary. This commit fixes the aforementioned problem by not requiring allocation to occur on the initial recovery node when the recovery type is a restore of a snapshot. This commit also ensures that the internal shrink index settings are recognized and not archived (which can trip an assertion in the restore scenario). Closes #24257
This commit is contained in:
parent
3187ed73fc
commit
0e52e3420e
|
@ -41,8 +41,6 @@ import org.elasticsearch.common.collect.MapBuilder;
|
|||
import org.elasticsearch.common.compress.CompressedXContent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -62,7 +60,6 @@ import org.joda.time.DateTime;
|
|||
import org.joda.time.DateTimeZone;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.text.ParseException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
|
@ -440,12 +437,14 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
|
|||
return mappings.get(mappingType);
|
||||
}
|
||||
|
||||
public static final Setting<String> INDEX_SHRINK_SOURCE_UUID = Setting.simpleString("index.shrink.source.uuid");
|
||||
public static final Setting<String> INDEX_SHRINK_SOURCE_NAME = Setting.simpleString("index.shrink.source.name");
|
||||
public static final String INDEX_SHRINK_SOURCE_UUID_KEY = "index.shrink.source.uuid";
|
||||
public static final String INDEX_SHRINK_SOURCE_NAME_KEY = "index.shrink.source.name";
|
||||
public static final Setting<String> INDEX_SHRINK_SOURCE_UUID = Setting.simpleString(INDEX_SHRINK_SOURCE_UUID_KEY);
|
||||
public static final Setting<String> INDEX_SHRINK_SOURCE_NAME = Setting.simpleString(INDEX_SHRINK_SOURCE_NAME_KEY);
|
||||
|
||||
|
||||
public Index getMergeSourceIndex() {
|
||||
return INDEX_SHRINK_SOURCE_UUID.exists(settings) ? new Index(INDEX_SHRINK_SOURCE_NAME.get(settings), INDEX_SHRINK_SOURCE_UUID.get(settings)) : null;
|
||||
return INDEX_SHRINK_SOURCE_UUID.exists(settings) ? new Index(INDEX_SHRINK_SOURCE_NAME.get(settings), INDEX_SHRINK_SOURCE_UUID.get(settings)) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -598,7 +598,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
indexSettingsBuilder
|
||||
// we use "i.r.a.initial_recovery" rather than "i.r.a.require|include" since we want the replica to allocate right away
|
||||
// once we are allocated.
|
||||
.put("index.routing.allocation.initial_recovery._id",
|
||||
.put(IndexMetaData.INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING.getKey() + "_id",
|
||||
Strings.arrayToCommaDelimitedString(nodesToAllocateOn.toArray()))
|
||||
// we only try once and then give up with a shrink index
|
||||
.put("index.allocation.max_retries", 1)
|
||||
|
|
|
@ -139,8 +139,8 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
|
|||
"allocation set " + inSyncAllocationIds);
|
||||
}
|
||||
|
||||
if (shardRouting.primary() && shardRouting.initializing() && shardRouting.relocating() == false &&
|
||||
RecoverySource.isInitialRecovery(shardRouting.recoverySource().getType()) == false &&
|
||||
if (shardRouting.primary() && shardRouting.initializing() &&
|
||||
shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE &&
|
||||
inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false)
|
||||
throw new IllegalStateException("a primary shard routing " + shardRouting + " is a primary that is recovering from " +
|
||||
"a known allocation id but has no corresponding entry in the in-sync " +
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.cluster.routing;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
|
@ -29,7 +28,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.snapshots.Snapshot;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
|
@ -249,14 +247,4 @@ public abstract class RecoverySource implements Writeable, ToXContent {
|
|||
return "peer recovery";
|
||||
}
|
||||
}
|
||||
|
||||
private static EnumSet<RecoverySource.Type> INITIAL_RECOVERY_TYPES = EnumSet.of(Type.EMPTY_STORE, Type.LOCAL_SHARDS, Type.SNAPSHOT);
|
||||
|
||||
/**
|
||||
* returns true for recovery types that indicate that a primary is being allocated for the very first time.
|
||||
* This recoveries can be controlled by {@link IndexMetaData#INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING}
|
||||
*/
|
||||
public static boolean isInitialRecovery(RecoverySource.Type type) {
|
||||
return INITIAL_RECOVERY_TYPES.contains(type);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ import org.elasticsearch.common.settings.Setting;
|
|||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.EnumSet;
|
||||
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR;
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND;
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
|
||||
|
@ -75,6 +77,17 @@ public class FilterAllocationDecider extends AllocationDecider {
|
|||
public static final Setting<Settings> CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING =
|
||||
Setting.groupSetting(CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX + ".", IP_VALIDATOR, Property.Dynamic, Property.NodeScope);
|
||||
|
||||
/**
|
||||
* The set of {@link RecoverySource.Type} values for which the
|
||||
* {@link IndexMetaData#INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING} should apply.
|
||||
* Note that we do not include the {@link RecoverySource.Type#SNAPSHOT} type here
|
||||
* because if the snapshot is restored to a different cluster that does not contain
|
||||
* the initial recovery node id, or to the same cluster where the initial recovery node
|
||||
* id has been decommissioned, then the primary shards will never be allocated.
|
||||
*/
|
||||
static EnumSet<RecoverySource.Type> INITIAL_RECOVERY_TYPES =
|
||||
EnumSet.of(RecoverySource.Type.EMPTY_STORE, RecoverySource.Type.LOCAL_SHARDS);
|
||||
|
||||
private volatile DiscoveryNodeFilters clusterRequireFilters;
|
||||
private volatile DiscoveryNodeFilters clusterIncludeFilters;
|
||||
private volatile DiscoveryNodeFilters clusterExcludeFilters;
|
||||
|
@ -98,7 +111,7 @@ public class FilterAllocationDecider extends AllocationDecider {
|
|||
IndexMetaData indexMd = allocation.metaData().getIndexSafe(shardRouting.index());
|
||||
DiscoveryNodeFilters initialRecoveryFilters = indexMd.getInitialRecoveryFilters();
|
||||
if (initialRecoveryFilters != null &&
|
||||
RecoverySource.isInitialRecovery(shardRouting.recoverySource().getType()) &&
|
||||
INITIAL_RECOVERY_TYPES.contains(shardRouting.recoverySource().getType()) &&
|
||||
initialRecoveryFilters.match(node.node()) == false) {
|
||||
String explanation = (shardRouting.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) ?
|
||||
"initial allocation of the shrunken index is only allowed on nodes [%s] that hold a copy of every shard in the index" :
|
||||
|
|
|
@ -191,9 +191,11 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
|
|||
case IndexMetaData.SETTING_VERSION_UPGRADED:
|
||||
case IndexMetaData.SETTING_INDEX_PROVIDED_NAME:
|
||||
case MergePolicyConfig.INDEX_MERGE_ENABLED:
|
||||
case IndexMetaData.INDEX_SHRINK_SOURCE_UUID_KEY:
|
||||
case IndexMetaData.INDEX_SHRINK_SOURCE_NAME_KEY:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
return IndexMetaData.INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING.getRawKey().match(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -352,8 +352,7 @@ public class RoutingTableTests extends ESAllocationTestCase {
|
|||
Set<String> insyncAids = shardTable.activeShards().stream().map(
|
||||
shr -> shr.allocationId().getId()).collect(Collectors.toSet());
|
||||
final ShardRouting primaryShard = shardTable.primaryShard();
|
||||
if (primaryShard.initializing() && primaryShard.relocating() == false &&
|
||||
RecoverySource.isInitialRecovery(primaryShard.recoverySource().getType()) == false ) {
|
||||
if (primaryShard.initializing() && shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE) {
|
||||
// simulate a primary was initialized based on aid
|
||||
insyncAids.add(primaryShard.allocationId().getId());
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -29,19 +29,14 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.test.gateway.TestGatewayAllocator;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -144,8 +139,7 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
private ClusterState createInitialClusterState(AllocationService service, Settings settings) {
|
||||
RecoverySource.Type recoveryType = randomFrom(RecoverySource.Type.EMPTY_STORE,
|
||||
RecoverySource.Type.LOCAL_SHARDS, RecoverySource.Type.SNAPSHOT);
|
||||
RecoverySource.Type recoveryType = randomFrom(FilterAllocationDecider.INITIAL_RECOVERY_TYPES);
|
||||
MetaData.Builder metaData = MetaData.builder();
|
||||
final Settings.Builder indexSettings = settings(Version.CURRENT).put(settings);
|
||||
final IndexMetaData sourceIndex;
|
||||
|
@ -164,9 +158,6 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
|
|||
}
|
||||
final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder("idx").settings(indexSettings)
|
||||
.numberOfShards(1).numberOfReplicas(1);
|
||||
if (recoveryType == RecoverySource.Type.SNAPSHOT) {
|
||||
indexMetaDataBuilder.putInSyncAllocationIds(0, Collections.singleton("_snapshot_restore"));
|
||||
}
|
||||
final IndexMetaData indexMetaData = indexMetaDataBuilder.build();
|
||||
metaData.put(indexMetaData, false);
|
||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
||||
|
@ -174,11 +165,6 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
|
|||
case EMPTY_STORE:
|
||||
routingTableBuilder.addAsNew(indexMetaData);
|
||||
break;
|
||||
case SNAPSHOT:
|
||||
routingTableBuilder.addAsRestore(indexMetaData, new RecoverySource.SnapshotRecoverySource(
|
||||
new Snapshot("repository", new SnapshotId("snapshot_name", "snapshot_uuid")),
|
||||
Version.CURRENT, indexMetaData.getIndex().getName()));
|
||||
break;
|
||||
case LOCAL_SHARDS:
|
||||
routingTableBuilder.addAsFromCloseToOpen(sourceIndex);
|
||||
routingTableBuilder.addAsNew(indexMetaData);
|
||||
|
@ -192,7 +178,7 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
|
|||
.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build();
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")))
|
||||
.build();
|
||||
return service.reroute(clusterState, "reroute", false);
|
||||
return service.reroute(clusterState, "reroute");
|
||||
}
|
||||
|
||||
public void testInvalidIPFilter() {
|
|
@ -54,6 +54,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
@ -127,7 +128,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
NonSnapshottableGatewayMetadata::readDiffFrom, NonSnapshottableGatewayMetadata::fromXContent);
|
||||
registerMetaDataCustom(SnapshotableGatewayNoApiMetadata.TYPE, SnapshotableGatewayNoApiMetadata::readFrom,
|
||||
NonSnapshottableGatewayMetadata::readDiffFrom, SnapshotableGatewayNoApiMetadata::fromXContent);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -154,8 +154,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
logger.info("--> wait for the second node to join the cluster");
|
||||
assertThat(client.admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut(), equalTo(false));
|
||||
|
||||
int random = randomIntBetween(10, 42);
|
||||
|
||||
logger.info("--> set test persistent setting");
|
||||
client.admin().cluster().prepareUpdateSettings().setPersistentSettings(
|
||||
Settings.builder()
|
||||
|
@ -723,7 +721,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
if (clusterStateError.get() != null) {
|
||||
throw clusterStateError.get();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testMasterShutdownDuringSnapshot() throws Exception {
|
||||
|
@ -801,33 +798,72 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
assertEquals(0, snapshotInfo.failedShards());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that a shrunken index (created via the shrink APIs) and subsequently snapshotted
|
||||
* can be restored when the node the shrunken index was created on is no longer part of
|
||||
* the cluster.
|
||||
*/
|
||||
public void testRestoreShrinkIndex() throws Exception {
|
||||
logger.info("--> starting a master node and a data node");
|
||||
internalCluster().startMasterOnlyNode();
|
||||
internalCluster().startDataOnlyNode();
|
||||
|
||||
private boolean snapshotIsDone(String repository, String snapshot) {
|
||||
try {
|
||||
SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus(repository).setSnapshots(snapshot).get();
|
||||
if (snapshotsStatusResponse.getSnapshots().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
for (SnapshotStatus snapshotStatus : snapshotsStatusResponse.getSnapshots()) {
|
||||
if (snapshotStatus.getState().completed()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
} catch (SnapshotMissingException ex) {
|
||||
return false;
|
||||
final Client client = client();
|
||||
final String repo = "test-repo";
|
||||
final String snapshot = "test-snap";
|
||||
final String sourceIdx = "test-idx";
|
||||
final String shrunkIdx = "test-idx-shrunk";
|
||||
|
||||
logger.info("--> creating repository");
|
||||
assertAcked(client.admin().cluster().preparePutRepository(repo).setType("fs")
|
||||
.setSettings(Settings.builder().put("location", randomRepoPath())
|
||||
.put("compress", randomBoolean())));
|
||||
|
||||
assertAcked(prepareCreate(sourceIdx, 0, Settings.builder()
|
||||
.put("number_of_shards", between(1, 20)).put("number_of_replicas", 0)));
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> indexing some data");
|
||||
IndexRequestBuilder[] builders = new IndexRequestBuilder[randomIntBetween(10, 100)];
|
||||
for (int i = 0; i < builders.length; i++) {
|
||||
builders[i] = client().prepareIndex(sourceIdx, "type1",
|
||||
Integer.toString(i)).setSource("field1", "bar " + i);
|
||||
}
|
||||
}
|
||||
indexRandom(true, builders);
|
||||
flushAndRefresh();
|
||||
|
||||
private void createTestIndex(String name) {
|
||||
assertAcked(prepareCreate(name, 0, Settings.builder().put("number_of_shards", between(1, 6))
|
||||
.put("number_of_replicas", between(1, 6))));
|
||||
logger.info("--> shrink the index");
|
||||
assertAcked(client.admin().indices().prepareUpdateSettings(sourceIdx)
|
||||
.setSettings(Settings.builder().put("index.blocks.write", true)).get());
|
||||
assertAcked(client.admin().indices().prepareShrinkIndex(sourceIdx, shrunkIdx).get());
|
||||
|
||||
logger.info("--> indexing some data into {}", name);
|
||||
for (int i = 0; i < between(10, 500); i++) {
|
||||
index(name, "doc", Integer.toString(i), "foo", "bar" + i);
|
||||
}
|
||||
logger.info("--> snapshot the shrunk index");
|
||||
CreateSnapshotResponse createResponse = client.admin().cluster()
|
||||
.prepareCreateSnapshot(repo, snapshot)
|
||||
.setWaitForCompletion(true).setIndices(shrunkIdx).get();
|
||||
assertEquals(SnapshotState.SUCCESS, createResponse.getSnapshotInfo().state());
|
||||
|
||||
logger.info("--> delete index and stop the data node");
|
||||
assertAcked(client.admin().indices().prepareDelete(sourceIdx).get());
|
||||
assertAcked(client.admin().indices().prepareDelete(shrunkIdx).get());
|
||||
internalCluster().stopRandomDataNode();
|
||||
client().admin().cluster().prepareHealth().setTimeout("30s").setWaitForNodes("1");
|
||||
|
||||
logger.info("--> start a new data node");
|
||||
final Settings dataSettings = Settings.builder()
|
||||
.put(Node.NODE_NAME_SETTING.getKey(), randomAlphaOfLength(5))
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) // to get a new node id
|
||||
.build();
|
||||
internalCluster().startDataOnlyNode(dataSettings);
|
||||
client().admin().cluster().prepareHealth().setTimeout("30s").setWaitForNodes("2");
|
||||
|
||||
logger.info("--> restore the shrunk index and ensure all shards are allocated");
|
||||
RestoreSnapshotResponse restoreResponse = client().admin().cluster()
|
||||
.prepareRestoreSnapshot(repo, snapshot).setWaitForCompletion(true)
|
||||
.setIndices(shrunkIdx).get();
|
||||
assertEquals(restoreResponse.getRestoreInfo().totalShards(),
|
||||
restoreResponse.getRestoreInfo().successfulShards());
|
||||
ensureYellow();
|
||||
}
|
||||
|
||||
public static class SnapshottableMetadata extends TestCustomMetaData {
|
||||
|
|
Loading…
Reference in New Issue