diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java index bab3dcb2ed2..7e975b922d4 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java @@ -115,6 +115,8 @@ import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettin import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction; import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction; +import org.elasticsearch.action.admin.indices.shrink.ShrinkAction; +import org.elasticsearch.action.admin.indices.shrink.TransportShrinkAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateAction; @@ -286,6 +288,7 @@ public class ActionModule extends AbstractModule { registerAction(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); registerAction(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class); registerAction(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class); + registerAction(ShrinkAction.INSTANCE, TransportShrinkAction.class); registerAction(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class); registerAction(GetIndexAction.INSTANCE, TransportGetIndexAction.class); registerAction(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class); diff --git a/core/src/main/java/org/elasticsearch/action/IndicesRequest.java b/core/src/main/java/org/elasticsearch/action/IndicesRequest.java index 4c62a7e849b..3ef699818b6 100644 --- a/core/src/main/java/org/elasticsearch/action/IndicesRequest.java +++ b/core/src/main/java/org/elasticsearch/action/IndicesRequest.java @@ -40,7 +40,7 @@ public interface IndicesRequest { */ IndicesOptions indicesOptions(); - static interface Replaceable extends IndicesRequest { + interface Replaceable extends IndicesRequest { /** * Sets the indices that the action relates to. */ diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java index 72fe0553f60..8d7e75b1626 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; import org.elasticsearch.transport.TransportMessage; import java.util.HashMap; @@ -40,6 +41,7 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ private final String cause; private final String index; private final boolean updateAllTypes; + private Index shrinkFrom; private IndexMetaData.State state = IndexMetaData.State.OPEN; @@ -54,7 +56,7 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ private final Set blocks = new HashSet<>(); - CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) { + public CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) { this.originalMessage = originalMessage; this.cause = cause; this.index = index; @@ -91,6 +93,11 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ return this; } + public CreateIndexClusterStateUpdateRequest shrinkFrom(Index shrinkFrom) { + this.shrinkFrom = shrinkFrom; + return this; + } + public TransportMessage originalMessage() { return originalMessage; } @@ -127,6 +134,10 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ return blocks; } + public Index shrinkFrom() { + return shrinkFrom; + } + /** True if all fields that span multiple types should be updated, false otherwise */ public boolean updateAllTypes() { return updateAllTypes; diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java index 485f0a53f69..b9282002349 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java @@ -30,10 +30,10 @@ import java.io.IOException; */ public class CreateIndexResponse extends AcknowledgedResponse { - CreateIndexResponse() { + protected CreateIndexResponse() { } - CreateIndexResponse(boolean acknowledged) { + protected CreateIndexResponse(boolean acknowledged) { super(acknowledged); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkAction.java new file mode 100644 index 00000000000..4c09241ad75 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkAction.java @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.shrink; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +/** + */ +public class ShrinkAction extends Action { + + public static final ShrinkAction INSTANCE = new ShrinkAction(); + public static final String NAME = "indices:admin/shrink"; + + private ShrinkAction() { + super(NAME); + } + + @Override + public ShrinkResponse newResponse() { + return new ShrinkResponse(); + } + + @Override + public ShrinkRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new ShrinkRequestBuilder(client, this); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java new file mode 100644 index 00000000000..069c27ce475 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java @@ -0,0 +1,107 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.shrink; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * Request class to shrink an index into a single shard + */ +public class ShrinkRequest extends AcknowledgedRequest implements IndicesRequest { + + private CreateIndexRequest shrinkIndexRequest; + private String sourceIndex; + + ShrinkRequest() {} + + public ShrinkRequest(String targetIndex, String sourceindex) { + this.shrinkIndexRequest = new CreateIndexRequest(targetIndex); + this.sourceIndex = sourceindex; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = shrinkIndexRequest == null ? null : shrinkIndexRequest.validate(); + if (sourceIndex == null) { + validationException = addValidationError("source index is missing", validationException); + } + if (shrinkIndexRequest == null) { + validationException = addValidationError("shrink index request is missing", validationException); + } + return validationException; + } + + public void setSourceIndex(String index) { + this.sourceIndex = index; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shrinkIndexRequest = new CreateIndexRequest(); + shrinkIndexRequest.readFrom(in); + sourceIndex = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shrinkIndexRequest.writeTo(out); + out.writeString(sourceIndex); + } + + @Override + public String[] indices() { + return new String[] {sourceIndex}; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.lenientExpandOpen(); + } + + public void setShrinkIndex(CreateIndexRequest shrinkIndexRequest) { + this.shrinkIndexRequest = Objects.requireNonNull(shrinkIndexRequest, "shrink index request must not be null"); + } + + /** + * Returns the {@link CreateIndexRequest} for the shrink index + */ + public CreateIndexRequest getShrinkIndexReqeust() { + return shrinkIndexRequest; + } + + /** + * Returns the source index name + */ + public String getSourceIndex() { + return sourceIndex; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java new file mode 100644 index 00000000000..a098215b750 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.shrink; + +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.settings.Settings; + +public class ShrinkRequestBuilder extends AcknowledgedRequestBuilder { + public ShrinkRequestBuilder(ElasticsearchClient client, ShrinkAction action) { + super(client, action, new ShrinkRequest()); + } + + + public ShrinkRequestBuilder setTargetIndex(CreateIndexRequest request) { + this.request.setShrinkIndex(request); + return this; + } + + public ShrinkRequestBuilder setSourceIndex(String index) { + this.request.setSourceIndex(index); + return this; + } + + public ShrinkRequestBuilder setSettings(Settings settings) { + this.request.getShrinkIndexReqeust().settings(settings); + return this; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java new file mode 100644 index 00000000000..4835471ae4c --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.shrink; + +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; + +public final class ShrinkResponse extends CreateIndexResponse { + ShrinkResponse() { + } + + ShrinkResponse(boolean acknowledged) { + super(acknowledged); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java new file mode 100644 index 00000000000..ca007e31216 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java @@ -0,0 +1,162 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.shrink; + +import org.apache.lucene.index.IndexWriter; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.indices.IndexAlreadyExistsException; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; + +/** + * Main class to initiate shrinking an index into a new index with a single shard + */ +public class TransportShrinkAction extends TransportMasterNodeAction { + + private final MetaDataCreateIndexService createIndexService; + private final Client client; + + @Inject + public TransportShrinkAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, MetaDataCreateIndexService createIndexService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client) { + super(settings, ShrinkAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, + ShrinkRequest::new); + this.createIndexService = createIndexService; + this.client = client; + } + + @Override + protected String executor() { + // we go async right away + return ThreadPool.Names.SAME; + } + + @Override + protected ShrinkResponse newResponse() { + return new ShrinkResponse(); + } + + @Override + protected ClusterBlockException checkBlock(ShrinkRequest request, ClusterState state) { + return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getShrinkIndexReqeust().index()); + } + + @Override + protected void masterOperation(final ShrinkRequest shrinkRequest, final ClusterState state, + final ActionListener listener) { + final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(shrinkRequest.getSourceIndex()); + client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(new ActionListener() { + @Override + public void onResponse(IndicesStatsResponse indicesStatsResponse) { + CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(shrinkRequest, state, + indicesStatsResponse.getTotal().getDocs(), indexNameExpressionResolver); + createIndexService.createIndex(updateRequest, new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse response) { + listener.onResponse(new ShrinkResponse(response.isAcknowledged())); + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof IndexAlreadyExistsException) { + logger.trace("[{}] failed to create shrink index", t, updateRequest.index()); + } else { + logger.debug("[{}] failed to create shrink index", t, updateRequest.index()); + } + listener.onFailure(t); + } + }); + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + + } + + // static for unittesting this method + static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final ShrinkRequest shrinkReqeust, final ClusterState state + , final DocsStats docsStats, IndexNameExpressionResolver indexNameExpressionResolver) { + final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(shrinkReqeust.getSourceIndex()); + final CreateIndexRequest targetIndex = shrinkReqeust.getShrinkIndexReqeust(); + final String targetIndexName = indexNameExpressionResolver.resolveDateMathExpression(targetIndex.index()); + final IndexMetaData metaData = state.metaData().index(sourceIndex); + final Settings targetIndexSettings = Settings.builder().put(targetIndex.settings()) + .normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX).build(); + long count = docsStats.getCount(); + if (count >= IndexWriter.MAX_DOCS) { + throw new IllegalStateException("Can't merge index with more than [" + IndexWriter.MAX_DOCS + + "] docs - too many documents"); + } + targetIndex.cause("shrink_index"); + targetIndex.settings(Settings.builder() + .put(targetIndexSettings) + // we can only shrink to 1 index so far! + .put("index.number_of_shards", 1) + ); + + return new CreateIndexClusterStateUpdateRequest(targetIndex, + "shrink_index", targetIndexName, true) + // mappings are updated on the node when merging in the shards, this prevents race-conditions since all mapping must be + // applied once we took the snapshot and if somebody fucks things up and switches the index read/write and adds docs we miss + // the mappings for everything is corrupted and hard to debug + .ackTimeout(targetIndex.timeout()) + .masterNodeTimeout(targetIndex.masterNodeTimeout()) + .settings(targetIndex.settings()) + .aliases(targetIndex.aliases()) + .customs(targetIndex.customs()) + .shrinkFrom(metaData.getIndex()); + } + +} diff --git a/core/src/main/java/org/elasticsearch/bootstrap/ESPolicy.java b/core/src/main/java/org/elasticsearch/bootstrap/ESPolicy.java index 1cd3a9ad57e..ddc25c88535 100644 --- a/core/src/main/java/org/elasticsearch/bootstrap/ESPolicy.java +++ b/core/src/main/java/org/elasticsearch/bootstrap/ESPolicy.java @@ -35,12 +35,12 @@ import java.util.Map; /** custom policy for union of static and dynamic permissions */ final class ESPolicy extends Policy { - + /** template policy file, the one used in tests */ static final String POLICY_RESOURCE = "security.policy"; /** limited policy for scripts */ static final String UNTRUSTED_RESOURCE = "untrusted.policy"; - + final Policy template; final Policy untrusted; final Policy system; @@ -60,7 +60,7 @@ final class ESPolicy extends Policy { } @Override @SuppressForbidden(reason = "fast equals check is desired") - public boolean implies(ProtectionDomain domain, Permission permission) { + public boolean implies(ProtectionDomain domain, Permission permission) { CodeSource codeSource = domain.getCodeSource(); // codesource can be null when reducing privileges via doPrivileged() if (codeSource == null) { diff --git a/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index a475ce15d4e..10275f22d66 100644 --- a/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -92,6 +92,9 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRespons import org.elasticsearch.action.admin.indices.shards.IndicesShardStoreRequestBuilder; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.elasticsearch.action.admin.indices.shrink.ShrinkRequest; +import org.elasticsearch.action.admin.indices.shrink.ShrinkRequestBuilder; +import org.elasticsearch.action.admin.indices.shrink.ShrinkResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; @@ -782,4 +785,19 @@ public interface IndicesAdminClient extends ElasticsearchClient { */ GetSettingsRequestBuilder prepareGetSettings(String... indices); + /** + * Shrinks an index using an explicit request allowing to specify the settings, mappings and aliases of the target index of the index. + */ + ShrinkRequestBuilder prepareShrinkIndex(String sourceIndex, String targetIndex); + + /** + * Shrinks an index using an explicit request allowing to specify the settings, mappings and aliases of the target index of the index. + */ + ActionFuture shrinkIndex(ShrinkRequest request); + + /** + * Shrinks an index using an explicit request allowing to specify the settings, mappings and aliases of the target index of the index. + */ + void shrinkIndex(ShrinkRequest request, ActionListener listener); + } diff --git a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java index e36bc4b8d77..6481bec1b83 100644 --- a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -212,6 +212,10 @@ import org.elasticsearch.action.admin.indices.shards.IndicesShardStoreRequestBui import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.elasticsearch.action.admin.indices.shrink.ShrinkAction; +import org.elasticsearch.action.admin.indices.shrink.ShrinkRequest; +import org.elasticsearch.action.admin.indices.shrink.ShrinkRequestBuilder; +import org.elasticsearch.action.admin.indices.shrink.ShrinkResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; @@ -1684,6 +1688,22 @@ public abstract class AbstractClient extends AbstractComponent implements Client return new GetSettingsRequestBuilder(this, GetSettingsAction.INSTANCE, indices); } + @Override + public ShrinkRequestBuilder prepareShrinkIndex(String sourceIndex, String targetIndex) { + return new ShrinkRequestBuilder(this, ShrinkAction.INSTANCE).setSourceIndex(sourceIndex) + .setTargetIndex(new CreateIndexRequest(targetIndex)); + } + + @Override + public ActionFuture shrinkIndex(ShrinkRequest request) { + return execute(ShrinkAction.INSTANCE, request); + } + + @Override + public void shrinkIndex(ShrinkRequest request, ActionListener listener) { + execute(ShrinkAction.INSTANCE, request, listener); + } + @Override public ActionFuture getSettings(GetSettingsRequest request) { return execute(GetSettingsAction.INSTANCE, request); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 52721411f46..3a1856c7481 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -383,6 +383,14 @@ public class IndexMetaData implements Diffable, FromXContentBuild return mappings.get(mappingType); } + public static final Setting INDEX_SHRINK_SOURCE_UUID = Setting.simpleString("index.shrink.source.uuid"); + public static final Setting INDEX_SHRINK_SOURCE_NAME = Setting.simpleString("index.shrink.source.name"); + + + public Index getMergeSourceIndex() { + return INDEX_SHRINK_SOURCE_UUID.exists(settings) ? new Index(INDEX_SHRINK_SOURCE_NAME.get(settings), INDEX_SHRINK_SOURCE_UUID.get(settings)) : null; + } + /** * Sometimes, the default mapping exists and an actual mapping is not created yet (introduced), * in this case, we want to return the default mapping in case it has some default mapping definitions. diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index c8c352fc46d..5532d4788ad 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.metadata; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.lucene.index.IndexWriter; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -31,11 +32,15 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData.Custom; import org.elasticsearch.cluster.metadata.IndexMetaData.State; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; @@ -55,17 +60,18 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.env.Environment; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.NodeServicesProvider; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexCreationException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; -import org.elasticsearch.script.ScriptService; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -80,6 +86,8 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; @@ -264,7 +272,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { templatesAliases.put(aliasMetaData.alias(), aliasMetaData); } } - Settings.Builder indexSettingsBuilder = Settings.builder(); // apply templates, here, in reverse order, since first ones are better matching for (int i = templates.size() - 1; i >= 0; i--) { @@ -293,6 +300,11 @@ public class MetaDataCreateIndexService extends AbstractComponent { } indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID()); + final Index shrinkFromIndex = request.shrinkFrom(); + if (shrinkFromIndex != null) { + prepareShrinkIndexSettings(currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex, + request.index()); + } Settings actualIndexSettings = indexSettingsBuilder.build(); @@ -481,4 +493,82 @@ public class MetaDataCreateIndexService extends AbstractComponent { return Regex.simpleMatch(template.template(), request.index()); } } + + /** + * Validates the settings and mappings for shrinking an index. + * @return the list of nodes at least one instance of the source index shards are allocated + */ + static List validateShrinkIndex(ClusterState state, String sourceIndex, + Set targetIndexMappingsTypes, String targetIndexName, + Settings targetIndexSettings) { + if (state.metaData().hasIndex(targetIndexName)) { + throw new IndexAlreadyExistsException(state.metaData().index(targetIndexName).getIndex()); + } + final IndexMetaData sourceMetaData = state.metaData().index(sourceIndex); + if (sourceMetaData == null) { + throw new IndexNotFoundException(sourceIndex); + } + // ensure index is read-only + if (state.blocks().indexBlocked(ClusterBlockLevel.WRITE, sourceIndex) == false) { + throw new IllegalStateException("index " + sourceIndex + " must be read-only to shrink index. use \"index.blocks.write=true\""); + } + + if (sourceMetaData.getNumberOfShards() == 1) { + throw new IllegalArgumentException("can't shrink an index with only one shard"); + } + + if ((targetIndexMappingsTypes.size() > 1 || + (targetIndexMappingsTypes.isEmpty() || targetIndexMappingsTypes.contains(MapperService.DEFAULT_MAPPING)) == false)) { + throw new IllegalArgumentException("mappings are not allowed when shrinking indices" + + ", all mappings are copied from the source index"); + } + if (IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings) + && IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings) > 1) { + throw new IllegalArgumentException("can not shrink index into more than one shard"); + } + + // now check that index is all on one node + final IndexRoutingTable table = state.routingTable().index(sourceIndex); + Map nodesToNumRouting = new HashMap<>(); + int numShards = sourceMetaData.getNumberOfShards(); + for (ShardRouting routing : table.shardsWithState(ShardRoutingState.STARTED)) { + nodesToNumRouting.computeIfAbsent(routing.currentNodeId(), (s) -> new AtomicInteger(0)).incrementAndGet(); + } + List nodesToAllocateOn = new ArrayList<>(); + for (Map.Entry entries : nodesToNumRouting.entrySet()) { + int numAllocations = entries.getValue().get(); + assert numAllocations <= numShards : "wait what? " + numAllocations + " is > than num shards " + numShards; + if (numAllocations == numShards) { + nodesToAllocateOn.add(entries.getKey()); + } + } + if (nodesToAllocateOn.isEmpty()) { + throw new IllegalStateException("index " + sourceIndex + + " must have all shards allocated on the same node to shrink index"); + } + return nodesToAllocateOn; + } + + static void prepareShrinkIndexSettings(ClusterState currentState, Set mappingKeys, Settings.Builder indexSettingsBuilder, Index shrinkFromIndex, String shrinkIntoName) { + final IndexMetaData sourceMetaData = currentState.metaData().index(shrinkFromIndex.getName()); + final List nodesToAllocateOn = validateShrinkIndex(currentState, shrinkFromIndex.getName(), + mappingKeys, shrinkIntoName, indexSettingsBuilder.build()); + final Predicate analysisSimilarityPredicate = (s) -> s.startsWith("index.similarity.") + || s.startsWith("index.analysis."); + indexSettingsBuilder + // we can only shrink to 1 index so far! + .put("index.number_of_shards", 1) + // we use "i.r.a.include" rather than "i.r.a.require" since it's allows one of the nodes holding an + // instanceof all shards. + .put("index.routing.allocation.include._id", + Strings.arrayToCommaDelimitedString(nodesToAllocateOn.toArray())) + // we only try once and then give up with a shrink index + .put("index.allocation.max_retries", 1) + // now copy all similarity / analysis settings - this overrides all settings from the user unless they + // wanna add extra settings + .put(sourceMetaData.getSettings().filter(analysisSimilarityPredicate)) + .put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME.getKey(), shrinkFromIndex.getName()) + .put(IndexMetaData.INDEX_SHRINK_SOURCE_UUID.getKey(), shrinkFromIndex.getUUID()); + } + } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index 8e099eb14f6..a57601abffc 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -19,12 +19,14 @@ package org.elasticsearch.cluster.routing; +import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.index.shard.ShardId; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; diff --git a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java index 201e5297511..fd9191fd713 100644 --- a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java +++ b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java @@ -72,6 +72,7 @@ import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestGetStoredSc import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestPutSearchTemplateAction; import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestPutStoredScriptAction; import org.elasticsearch.rest.action.admin.cluster.tasks.RestPendingClusterTasksAction; +import org.elasticsearch.rest.action.admin.indices.RestShrinkIndexAction; import org.elasticsearch.rest.action.admin.indices.alias.RestIndicesAliasesAction; import org.elasticsearch.rest.action.admin.indices.alias.delete.RestIndexDeleteAliasesAction; import org.elasticsearch.rest.action.admin.indices.alias.get.RestGetAliasesAction; @@ -209,6 +210,7 @@ public class NetworkModule extends AbstractModule { RestIndexPutAliasAction.class, RestIndicesAliasesAction.class, RestCreateIndexAction.class, + RestShrinkIndexAction.class, RestDeleteIndexAction.class, RestCloseIndexAction.class, RestOpenIndexAction.class, diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d4a147d475b..adcf36f694f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -35,8 +35,11 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RestoreSource; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.Booleans; @@ -55,7 +58,10 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.SuspendableRefContainer; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.cache.IndexCache; @@ -102,6 +108,7 @@ import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.index.warmer.ShardIndexWarmerService; import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.indices.IndexingMemoryController; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTargetService; @@ -125,6 +132,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; public class IndexShard extends AbstractIndexShardComponent { @@ -920,8 +928,7 @@ public class IndexShard extends AbstractIndexShardComponent { } else { openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; } - final EngineConfig config = newEngineConfig(openMode, translogConfig, cachingPolicy, - new IndexShardRecoveryPerformer(shardId, mapperService, logger)); + final EngineConfig config = newEngineConfig(openMode); // we disable deletes since we allow for operations to be executed against the shard while recovering // but we need to make sure we don't loose deletes until we are done recovering config.setEnableGcDeletes(false); @@ -1109,20 +1116,37 @@ public class IndexShard extends AbstractIndexShardComponent { return path; } - public boolean recoverFromStore(DiscoveryNode localNode) { + public boolean recoverFromLocalShards(BiConsumer mappingUpdateConsumer, List localShards) throws IOException { + final List snapshots = new ArrayList<>(); + try { + for (IndexShard shard : localShards) { + snapshots.add(new LocalShardSnapshot(shard)); + } + + // we are the first primary, recover from the gateway + // if its post api allocation, the index should exists + assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard"; + StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); + return storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots); + } finally { + IOUtils.close(snapshots); + } + } + + public boolean recoverFromStore() { // we are the first primary, recover from the gateway // if its post api allocation, the index should exists assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; boolean shouldExist = shardRouting.allocatedPostIndexCreate(indexSettings.getIndexMetaData()); StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - return storeRecovery.recoverFromStore(this, shouldExist, localNode); + return storeRecovery.recoverFromStore(this, shouldExist); } - public boolean restoreFromRepository(IndexShardRepository repository, DiscoveryNode localNode) { + public boolean restoreFromRepository(IndexShardRepository repository) { assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - return storeRecovery.recoverFromRepository(this, repository, localNode); + return storeRecovery.recoverFromRepository(this, repository); } /** @@ -1337,7 +1361,8 @@ public class IndexShard extends AbstractIndexShardComponent { } public void startRecovery(DiscoveryNode localNode, DiscoveryNode sourceNode, RecoveryTargetService recoveryTargetService, - RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService) { + RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, + BiConsumer mappingUpdateConsumer, IndicesService indicesService) { final RestoreSource restoreSource = shardRouting.restoreSource(); if (shardRouting.isPeerRecovery()) { @@ -1357,19 +1382,59 @@ public class IndexShard extends AbstractIndexShardComponent { } } else if (restoreSource == null) { // recover from filesystem store - final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(), - RecoveryState.Type.STORE, localNode, localNode); - markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread - threadPool.generic().execute(() -> { - try { - if (recoverFromStore(localNode)) { - recoveryListener.onRecoveryDone(recoveryState); - } - } catch (Throwable t) { - recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, t), true); - } - }); + IndexMetaData indexMetaData = indexSettings().getIndexMetaData(); + Index mergeSourceIndex = indexMetaData.getMergeSourceIndex(); + final boolean recoverFromLocalShards = mergeSourceIndex != null && shardRouting.allocatedPostIndexCreate(indexMetaData) == false && shardRouting.primary(); + final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(), + recoverFromLocalShards ? RecoveryState.Type.LOCAL_SHARDS : RecoveryState.Type.STORE, localNode, localNode); + if (recoverFromLocalShards) { + final List startedShards = new ArrayList<>(); + final IndexService sourceIndexService = indicesService.indexService(mergeSourceIndex); + final int numShards = sourceIndexService != null ? sourceIndexService.getIndexSettings().getNumberOfShards() : -1; + if (sourceIndexService != null) { + for (IndexShard shard : sourceIndexService) { + if (shard.state() == IndexShardState.STARTED) { + startedShards.add(shard); + } + } + } + if (numShards == startedShards.size()) { + markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread + threadPool.generic().execute(() -> { + try { + if (recoverFromLocalShards(mappingUpdateConsumer, startedShards)) { + recoveryListener.onRecoveryDone(recoveryState); + } + } catch (Throwable t) { + recoveryListener.onRecoveryFailure(recoveryState, + new RecoveryFailedException(shardId, localNode, localNode, t), true); + } + }); + } else { + final Throwable t; + if (numShards == -1) { + t = new IndexNotFoundException(mergeSourceIndex); + } else { + t = new IllegalStateException("not all shards from index " + mergeSourceIndex + + " are started yet, expected " + numShards + " found " + startedShards.size() + " can't recover shard " + + shardId()); + } + recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, localNode, localNode, t), true); + } + } else { + markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread + threadPool.generic().execute(() -> { + try { + if (recoverFromStore()) { + recoveryListener.onRecoveryDone(recoveryState); + } + } catch (Throwable t) { + recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, t), true); + } + + }); + } } else { // recover from a restore final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(), @@ -1378,7 +1443,7 @@ public class IndexShard extends AbstractIndexShardComponent { threadPool.generic().execute(() -> { try { final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository()); - if (restoreFromRepository(indexShardRepository, localNode)) { + if (restoreFromRepository(indexShardRepository)) { recoveryListener.onRecoveryDone(recoveryState); } } catch (Throwable first) { @@ -1461,7 +1526,8 @@ public class IndexShard extends AbstractIndexShardComponent { return mapperService.documentMapperWithAutoCreate(type); } - private final EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, TranslogConfig translogConfig, QueryCachingPolicy cachingPolicy, TranslogRecoveryPerformer translogRecoveryPerformer) { + private final EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { + final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger); return new EngineConfig(openMode, shardId, threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(), mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, @@ -1627,5 +1693,4 @@ public class IndexShard extends AbstractIndexShardComponent { IndexShard.this.delete(engine, engineDelete); } } - } diff --git a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java new file mode 100644 index 00000000000..25c59caef99 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java @@ -0,0 +1,138 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.NoLockFactory; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoveryState; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + */ +final class LocalShardSnapshot implements Closeable { + private final IndexShard shard; + private final Store store; + private final IndexCommit indexCommit; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public LocalShardSnapshot(IndexShard shard) { + this.shard = shard; + store = shard.store(); + store.incRef(); + boolean success = false; + try { + indexCommit = shard.snapshotIndex(true); + success = true; + } finally { + if (success == false) { + store.decRef(); + } + } + } + + Index getIndex() { + return shard.indexSettings().getIndex(); + } + + Directory getSnapshotDirectory() { + /* this directory will not be used for anything else but reading / copying files to another directory + * we prevent all write operations on this directory with UOE - nobody should close it either. */ + return new FilterDirectory(store.directory()) { + @Override + public String[] listAll() throws IOException { + Collection fileNames = indexCommit.getFileNames(); + final String[] fileNameArray = fileNames.toArray(new String[fileNames.size()]); + return fileNameArray; + } + + @Override + public void deleteFile(String name) throws IOException { + throw new UnsupportedOperationException("this directory is read-only"); + } + + @Override + public void sync(Collection names) throws IOException { + throw new UnsupportedOperationException("this directory is read-only"); + } + + @Override + public void renameFile(String source, String dest) throws IOException { + throw new UnsupportedOperationException("this directory is read-only"); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + throw new UnsupportedOperationException("this directory is read-only"); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { + throw new UnsupportedOperationException("this directory is read-only"); + } + + @Override + public Lock obtainLock(String name) throws IOException { + /* we do explicitly a no-lock instance since we hold an index commit from a SnapshotDeletionPolicy so we + * can we certain that nobody messes with the files on disk. We also hold a ref on the store which means + * no external source will delete files either.*/ + return NoLockFactory.INSTANCE.obtainLock(in, name); + } + + @Override + public void close() throws IOException { + throw new UnsupportedOperationException("nobody should close this directory wrapper"); + } + }; + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + try { + shard.releaseSnapshot(indexCommit); + } finally { + store.decRef(); + } + } + } + + ImmutableOpenMap getMappings() { + return shard.indexSettings.getIndexMetaData().getMappings(); + } + + @Override + public String toString() { + return "local_shard_snapshot:[" + shard.shardId() + " indexCommit: " + indexCommit + "]"; + } +} diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 574e68d8cc9..c7d6f35eff2 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -19,20 +19,27 @@ package org.elasticsearch.index.shard; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RestoreSource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.store.Store; @@ -40,6 +47,11 @@ import org.elasticsearch.indices.recovery.RecoveryState; import java.io.IOException; import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -63,12 +75,11 @@ final class StoreRecovery { * files / transaction logs. This * @param indexShard the index shard instance to recovery the shard into * @param indexShouldExists true iff the index should exist on disk ie. has the shard been allocated previously on the shards store. - * @param localNode the reference to the local node * @return true if the shard has been recovered successfully, false if the recovery * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. * @see Store */ - boolean recoverFromStore(final IndexShard indexShard, final boolean indexShouldExists, DiscoveryNode localNode) { + boolean recoverFromStore(final IndexShard indexShard, final boolean indexShouldExists) { if (canRecover(indexShard)) { if (indexShard.routingEntry().restoreSource() != null) { throw new IllegalStateException("can't recover - restore source is not null"); @@ -81,6 +92,141 @@ final class StoreRecovery { return false; } + boolean recoverFromLocalShards(BiConsumer mappingUpdateConsumer, final IndexShard indexShard, final List shards) throws IOException { + if (canRecover(indexShard)) { + assert indexShard.recoveryState().getType() == RecoveryState.Type.LOCAL_SHARDS : "invalid recovery type: " + indexShard.recoveryState().getType(); + if (indexShard.routingEntry().restoreSource() != null) { + throw new IllegalStateException("can't recover - restore source is not null"); + } + if (shards.isEmpty()) { + throw new IllegalArgumentException("shards must not be empty"); + } + Set indices = shards.stream().map((s) -> s.getIndex()).collect(Collectors.toSet()); + if (indices.size() > 1) { + throw new IllegalArgumentException("can't add shards from more than one index"); + } + for (ObjectObjectCursor mapping : shards.get(0).getMappings()) { + mappingUpdateConsumer.accept(mapping.key, mapping.value); + } + for (ObjectObjectCursor mapping : shards.get(0).getMappings()) { + indexShard.mapperService().merge(mapping.key,mapping.value.source(), MapperService.MergeReason.MAPPING_RECOVERY, true); + } + return executeRecovery(indexShard, () -> { + logger.debug("starting recovery from local shards {}", shards); + try { + final Directory directory = indexShard.store().directory(); // don't close this directory!! + addIndices(indexShard.recoveryState().getIndex(), directory, shards.stream().map(s -> s.getSnapshotDirectory()) + .collect(Collectors.toList()).toArray(new Directory[shards.size()])); + internalRecoverFromStore(indexShard, true); + // just trigger a merge to do housekeeping on the + // copied segments - we will also see them in stats etc. + indexShard.getEngine().forceMerge(false, -1, false, false, false); + } catch (IOException ex) { + throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex); + } + + }); + } + return false; + } + + final void addIndices(RecoveryState.Index indexRecoveryStats, Directory target, Directory... sources) throws IOException { + /* + * TODO: once we upgraded to Lucene 6.1 use HardlinkCopyDirectoryWrapper to enable hardlinks if possible and enable it + * in the security.policy: + * + * grant codeBase "${codebase.lucene-misc-6.1.0.jar}" { + * // needed to allow shard shrinking to use hard-links if possible via lucenes HardlinkCopyDirectoryWrapper + * permission java.nio.file.LinkPermission "hard"; + * }; + * target = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target); + */ + try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(target, indexRecoveryStats), + new IndexWriterConfig(null) + .setCommitOnClose(false) + // we don't want merges to happen here - we call maybe merge on the engine + // later once we stared it up otherwise we would need to wait for it here + // we also don't specify a codec here and merges should use the engines for this index + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.CREATE))) { + writer.addIndexes(sources); + writer.commit(); + } + } + + /** + * Directory wrapper that records copy process for recovery statistics + */ + static final class StatsDirectoryWrapper extends FilterDirectory { + private final RecoveryState.Index index; + + StatsDirectoryWrapper(Directory in, RecoveryState.Index indexRecoveryStats) { + super(in); + this.index = indexRecoveryStats; + } + + @Override + public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException { + final long l = from.fileLength(src); + final AtomicBoolean copies = new AtomicBoolean(false); + // here we wrap the index input form the source directory to report progress of file copy for the recovery stats. + // we increment the num bytes recovered in the readBytes method below, if users pull statistics they can see immediately + // how much has been recovered. + in.copyFrom(new FilterDirectory(from) { + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + index.addFileDetail(dest, l, false); + copies.set(true); + final IndexInput input = in.openInput(name, context); + return new IndexInput("StatsDirectoryWrapper(" + input.toString() + ")") { + @Override + public void close() throws IOException { + input.close(); + } + + @Override + public long getFilePointer() { + throw new UnsupportedOperationException("only straight copies are supported"); + } + + @Override + public void seek(long pos) throws IOException { + throw new UnsupportedOperationException("seeks are not supported"); + } + + @Override + public long length() { + return input.length(); + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + throw new UnsupportedOperationException("slices are not supported"); + } + + @Override + public byte readByte() throws IOException { + throw new UnsupportedOperationException("use a buffer if you wanna perform well"); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + // we rely on the fact that copyFrom uses a buffer + input.readBytes(b, offset, len); + index.addRecoveredBytesToFile(dest, len); + } + }; + } + }, src, dest, context); + if (copies.get() == false) { + index.addFileDetail(dest, l, true); // hardlinked - we treat it as reused since the file was already somewhat there + } else { + assert index.getFileDetails(dest) != null : "File [" + dest + "] has no file details"; + assert index.getFileDetails(dest).recovered() == l : index.getFileDetails(dest).toString(); + } + } + } + /** * Recovers an index from a given {@link IndexShardRepository}. This method restores a * previously created index snapshot into an existing initializing shard. @@ -89,7 +235,7 @@ final class StoreRecovery { * @return true if the shard has been recovered successfully, false if the recovery * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. */ - boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository, DiscoveryNode localNode) { + boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository) { if (canRecover(indexShard)) { final ShardRouting shardRouting = indexShard.routingEntry(); if (shardRouting.restoreSource() == null) { @@ -200,29 +346,28 @@ final class StoreRecovery { // it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling) // its a "new index create" API, we have to do something, so better to clean it than use same data logger.trace("cleaning existing shard, shouldn't exists"); - IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE)); - writer.close(); + Lucene.cleanLuceneIndex(store.directory()); } } } catch (Throwable e) { throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e); } recoveryState.getIndex().updateVersion(version); - // since we recover from local, just fill the files and size try { final RecoveryState.Index index = recoveryState.getIndex(); - if (si != null) { - final Directory directory = store.directory(); - for (String name : Lucene.files(si)) { - long length = directory.fileLength(name); - index.addFileDetail(name, length, true); - } + if (si != null && recoveryState.getType() == RecoveryState.Type.STORE) { + addRecoveredFileDetails(si, store, index); } } catch (IOException e) { logger.debug("failed to list file details", e); } - indexShard.performTranslogRecovery(indexShouldExists); + if (recoveryState.getType() == RecoveryState.Type.LOCAL_SHARDS) { + assert indexShouldExists; + indexShard.skipTranslogRecovery(); + } else { + indexShard.performTranslogRecovery(indexShouldExists); + } indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from shard_store"); } catch (EngineException | IOException e) { @@ -232,6 +377,14 @@ final class StoreRecovery { } } + private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException { + final Directory directory = store.directory(); + for (String name : Lucene.files(si)) { + long length = directory.fileLength(name); + index.addFileDetail(name, length, true); + } + } + /** * Restores shard from {@link RestoreSource} associated with this shard in routing table */ @@ -260,4 +413,5 @@ final class StoreRecovery { throw new IndexShardRestoreFailedException(shardId, "restore failed", t); } } + } diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index bb162880384..a720f5cb258 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -682,7 +682,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref return refCounter.refCount(); } - private static final class StoreDirectory extends FilterDirectory { + static final class StoreDirectory extends FilterDirectory { private final ESLogger deletesLogger; @@ -715,7 +715,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref public String toString() { return "store(" + in.toString() + ")"; } - } /** diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 6f9b1684b9e..23518d4b3c3 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -20,6 +20,7 @@ package org.elasticsearch.indices.cluster; import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -68,6 +69,8 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; @@ -473,7 +476,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { + try { + nodeServicesProvider.getClient().admin().indices().preparePutMapping() + .setConcreteIndex(indexService.index()) // concrete index - no name clash, it uses uuid + .setType(type) + .setSource(mapping.source().string()) + .get(); + } catch (IOException ex) { + throw new ElasticsearchException("failed to stringify mapping source", ex); + } + }, indicesService); } /** @@ -667,7 +679,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent(channel)); + } +} diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 5338f927005..f6e6c4aaed5 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -112,7 +112,7 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet; * method. *

* Individual shards are getting restored as part of normal recovery process in - * {@link IndexShard#restoreFromRepository(IndexShardRepository, DiscoveryNode)} )} + * {@link IndexShard#restoreFromRepository(IndexShardRepository)} )} * method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking * at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property. *

diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index 3b1040446e0..8ee38c86dc8 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -28,23 +28,31 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.node.service.NodeService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.junit.Ignore; import java.util.HashMap; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -277,4 +285,98 @@ public class CreateIndexIT extends ESIntegTestCase { internalCluster().fullRestart(); ensureGreen("test"); } + + public void testCreateShrinkIndex() { + internalCluster().ensureAtLeastNumDataNodes(2); + prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", randomIntBetween(2, 7))).get(); + for (int i = 0; i < 20; i++) { + client().prepareIndex("source", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get(); + } + ImmutableOpenMap dataNodes = client().admin().cluster().prepareState().get().getState().nodes() + .getDataNodes(); + assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); + DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class); + String mergeNode = discoveryNodes[0].getName(); + // relocate all shards to one node such that we can merge it. + client().admin().indices().prepareUpdateSettings("source") + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", mergeNode) + .put("index.blocks.write", true)).get(); + ensureGreen(); + // now merge source into a single shard index + assertAcked(client().admin().indices().prepareShrinkIndex("source", "target") + .setSettings(Settings.builder().put("index.number_of_replicas", 0).build()).get()); + ensureGreen(); + assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + // let it be allocated anywhere and bump replicas + client().admin().indices().prepareUpdateSettings("target") + .setSettings(Settings.builder() + .putNull("index.routing.allocation.include._id") + .put("index.number_of_replicas", 1)).get(); + ensureGreen(); + assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + + for (int i = 20; i < 40; i++) { + client().prepareIndex("target", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get(); + } + flushAndRefresh(); + assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 40); + assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + + } + /** + * Tests that we can manually recover from a failed allocation due to shards being moved away etc. + */ + public void testCreateShrinkIndexFails() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + prepareCreate("source").setSettings(Settings.builder().put(indexSettings()) + .put("number_of_shards", randomIntBetween(2, 7)) + .put("number_of_replicas", 0)).get(); + for (int i = 0; i < 20; i++) { + client().prepareIndex("source", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get(); + } + ImmutableOpenMap dataNodes = client().admin().cluster().prepareState().get().getState().nodes() + .getDataNodes(); + assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); + DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class); + String spareNode = discoveryNodes[0].getName(); + String mergeNode = discoveryNodes[1].getName(); + // relocate all shards to one node such that we can merge it. + client().admin().indices().prepareUpdateSettings("source") + .setSettings(Settings.builder().put("index.routing.allocation.require._name", mergeNode) + .put("index.blocks.write", true)).get(); + ensureGreen(); + // now merge source into a single shard index + client().admin().indices().prepareShrinkIndex("source", "target") + .setSettings(Settings.builder() + .put("index.routing.allocation.exclude._name", mergeNode) // we manually exclude the merge node to forcefully fuck it up + .put("index.number_of_replicas", 0) + .put("index.allocation.max_retries", 1).build()).get(); + + // now we move all shards away from the merge node + client().admin().indices().prepareUpdateSettings("source") + .setSettings(Settings.builder().put("index.routing.allocation.require._name", spareNode) + .put("index.blocks.write", true)).get(); + ensureGreen("source"); + + client().admin().indices().prepareUpdateSettings("target") // erase the forcefully fuckup! + .setSettings(Settings.builder().putNull("index.routing.allocation.exclude._name")).get(); + // wait until it fails + assertBusy(() -> { + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get(); + RoutingTable routingTables = clusterStateResponse.getState().routingTable(); + assertTrue(routingTables.index("target").shard(0).getShards().get(0).unassigned()); + assertEquals(UnassignedInfo.Reason.ALLOCATION_FAILED, + routingTables.index("target").shard(0).getShards().get(0).unassignedInfo().getReason()); + assertEquals(1, + routingTables.index("target").shard(0).getShards().get(0).unassignedInfo().getNumFailedAllocations()); + }); + client().admin().indices().prepareUpdateSettings("source") // now relocate them all to the right node + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", mergeNode)).get(); + ensureGreen("source"); + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); // kick off a retry and wait until it's done! + ensureGreen(); + assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java new file mode 100644 index 00000000000..50bf8715f19 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java @@ -0,0 +1,136 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.shrink; + +import org.apache.lucene.index.IndexWriter; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.indices.IndexAlreadyExistsException; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.gateway.NoopGatewayAllocator; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; + +import static java.util.Collections.emptyMap; + +public class TransportShrinkActionTests extends ESTestCase { + + private ClusterState createClusterState(String name, int numShards, int numReplicas, Settings settings) { + MetaData.Builder metaBuilder = MetaData.builder(); + IndexMetaData indexMetaData = IndexMetaData.builder(name).settings(settings(Version.CURRENT) + .put(settings)) + .numberOfShards(numShards).numberOfReplicas(numReplicas).build(); + metaBuilder.put(indexMetaData, false); + MetaData metaData = metaBuilder.build(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.addAsNew(metaData.index(name)); + + RoutingTable routingTable = routingTableBuilder.build(); + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData).routingTable(routingTable).blocks(ClusterBlocks.builder().addBlocks(indexMetaData)).build(); + return clusterState; + } + + public void testErrorCondition() { + ClusterState state = createClusterState("source", randomIntBetween(2, 100), randomIntBetween(0, 10), + Settings.builder().put("index.blocks.write", true).build()); + DocsStats stats = new DocsStats(randomIntBetween(0, IndexWriter.MAX_DOCS-1), randomIntBetween(1, 1000)); + + assertEquals("Can't merge index with more than [2147483519] docs - too many documents", + expectThrows(IllegalStateException.class, () -> + TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), state, + new DocsStats(Integer.MAX_VALUE, randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY)) + ).getMessage()); + + + // create one that won't fail + ClusterState clusterState = ClusterState.builder(createClusterState("source", randomIntBetween(2, 10), 0, + Settings.builder().put("index.blocks.write", true).build())).nodes(DiscoveryNodes.builder().put(newNode("node1"))) + .build(); + AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, + Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), + NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + + RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + // now we start the shard + routingTable = service.applyStartedShards(clusterState, + routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), clusterState, stats, + new IndexNameExpressionResolver(Settings.EMPTY)); + } + + public void testShrinkIndexSettings() { + String indexName = randomAsciiOfLength(10); + // create one that won't fail + ClusterState clusterState = ClusterState.builder(createClusterState(indexName, randomIntBetween(2, 10), 0, + Settings.builder() + .put("index.blocks.write", true) + .build())).nodes(DiscoveryNodes.builder().put(newNode("node1"))) + .build(); + AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, + Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), + NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + + RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + // now we start the shard + routingTable = service.applyStartedShards(clusterState, + routingTable.index(indexName).shardsWithState(ShardRoutingState.INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + DocsStats stats = new DocsStats(randomIntBetween(0, IndexWriter.MAX_DOCS-1), randomIntBetween(1, 1000)); + ShrinkRequest target = new ShrinkRequest("target", indexName); + CreateIndexClusterStateUpdateRequest request = TransportShrinkAction.prepareCreateIndexRequest( + target, clusterState, stats, new IndexNameExpressionResolver(Settings.EMPTY)); + assertNotNull(request.shrinkFrom()); + assertEquals(indexName, request.shrinkFrom().getName()); + assertEquals("1", request.settings().get("index.number_of_shards")); + assertEquals("shrink_index", request.cause()); + } + + private DiscoveryNode newNode(String nodeId) { + return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, emptyMap(), + Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA))), Version.CURRENT); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java new file mode 100644 index 00000000000..49d1da0ddec --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java @@ -0,0 +1,166 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.indices.IndexAlreadyExistsException; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.gateway.NoopGatewayAllocator; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; + +import static java.util.Collections.emptyMap; + +public class MetaDataCreateIndexServiceTests extends ESTestCase { + + private ClusterState createClusterState(String name, int numShards, int numReplicas, Settings settings) { + MetaData.Builder metaBuilder = MetaData.builder(); + IndexMetaData indexMetaData = IndexMetaData.builder(name).settings(settings(Version.CURRENT) + .put(settings)) + .numberOfShards(numShards).numberOfReplicas(numReplicas).build(); + metaBuilder.put(indexMetaData, false); + MetaData metaData = metaBuilder.build(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.addAsNew(metaData.index(name)); + + RoutingTable routingTable = routingTableBuilder.build(); + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData).routingTable(routingTable).blocks(ClusterBlocks.builder().addBlocks(indexMetaData)).build(); + return clusterState; + } + + public void testValidateShrinkIndex() { + ClusterState state = createClusterState("source", randomIntBetween(2, 100), randomIntBetween(0, 10), + Settings.builder().put("index.blocks.write", true).build()); + + assertEquals("index [source] already exists", + expectThrows(IndexAlreadyExistsException.class, () -> + MetaDataCreateIndexService.validateShrinkIndex(state, "target", Collections.emptySet(), "source", Settings.EMPTY) + ).getMessage()); + + assertEquals("no such index", + expectThrows(IndexNotFoundException.class, () -> + MetaDataCreateIndexService.validateShrinkIndex(state, "no such index", Collections.emptySet(), "target", Settings.EMPTY) + ).getMessage()); + + assertEquals("can't shrink an index with only one shard", + expectThrows(IllegalArgumentException.class, () -> + MetaDataCreateIndexService.validateShrinkIndex(createClusterState("source", 1, 0, + Settings.builder().put("index.blocks.write", true).build()), "source", Collections.emptySet(), + "target", Settings.EMPTY) + ).getMessage()); + + assertEquals("index source must be read-only to shrink index. use \"index.blocks.write=true\"", + expectThrows(IllegalStateException.class, () -> + MetaDataCreateIndexService.validateShrinkIndex( + createClusterState("source", randomIntBetween(2, 100), randomIntBetween(0, 10), Settings.EMPTY) + , "source", Collections.emptySet(), "target", Settings.EMPTY) + ).getMessage()); + + assertEquals("index source must have all shards allocated on the same node to shrink index", + expectThrows(IllegalStateException.class, () -> + MetaDataCreateIndexService.validateShrinkIndex(state, "source", Collections.emptySet(), "target", Settings.EMPTY) + + ).getMessage()); + + assertEquals("can not shrink index into more than one shard", + expectThrows(IllegalArgumentException.class, () -> + MetaDataCreateIndexService.validateShrinkIndex(state, "source", Collections.emptySet(), "target", + Settings.builder().put("index.number_of_shards", 2).build()) + ).getMessage()); + + assertEquals("mappings are not allowed when shrinking indices, all mappings are copied from the source index", + expectThrows(IllegalArgumentException.class, () -> { + MetaDataCreateIndexService.validateShrinkIndex(state, "source", Collections.singleton("foo"), + "target", Settings.EMPTY); + } + ).getMessage()); + + // create one that won't fail + ClusterState clusterState = ClusterState.builder(createClusterState("source", randomIntBetween(2, 10), 0, + Settings.builder().put("index.blocks.write", true).build())).nodes(DiscoveryNodes.builder().put(newNode("node1"))) + .build(); + AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, + Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), + NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + + RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + // now we start the shard + routingTable = service.applyStartedShards(clusterState, + routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + MetaDataCreateIndexService.validateShrinkIndex(clusterState, "source", Collections.emptySet(), "target", Settings.EMPTY); + } + + public void testShrinkIndexSettings() { + String indexName = randomAsciiOfLength(10); + // create one that won't fail + ClusterState clusterState = ClusterState.builder(createClusterState(indexName, randomIntBetween(2, 10), 0, + Settings.builder() + .put("index.blocks.write", true) + .put("index.similarity.default.type", "BM25") + .put("index.analysis.analyzer.my_analyzer.tokenizer", "keyword") + .build())).nodes(DiscoveryNodes.builder().put(newNode("node1"))) + .build(); + AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, + Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), + NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + + RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + // now we start the shard + routingTable = service.applyStartedShards(clusterState, + routingTable.index(indexName).shardsWithState(ShardRoutingState.INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + Settings.Builder builder = Settings.builder(); + MetaDataCreateIndexService.prepareShrinkIndexSettings( + clusterState, Collections.emptySet(), builder, clusterState.metaData().index(indexName).getIndex(), "target"); + assertEquals("1", builder.build().get("index.number_of_shards")); + assertEquals("similarity settings must be copied", "BM25", builder.build().get("index.similarity.default.type")); + assertEquals("analysis settings must be copied", + "keyword", builder.build().get("index.analysis.analyzer.my_analyzer.tokenizer")); + assertEquals("node1", builder.build().get("index.routing.allocation.include._id")); + assertEquals("1", builder.build().get("index.allocation.max_retries")); + } + + private DiscoveryNode newNode(String nodeId) { + return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, emptyMap(), + Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA))), Version.CURRENT); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java index 81d5c45c39b..707bfd9057e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java @@ -52,6 +52,12 @@ public class ShardRoutingHelper { return routing.reinitializeShard().updateUnassignedInfo(new UnassignedInfo(reason, "test_reinit")); } + public static ShardRouting initWithSameId(ShardRouting copy) { + return new ShardRouting(copy.shardId(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), + copy.primary(), ShardRoutingState.INITIALIZING, new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, null), + copy.allocationId(), copy.getExpectedShardSize()); + } + public static ShardRouting moveToUnassigned(ShardRouting routing, UnassignedInfo info) { return routing.moveToUnassigned(info); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index f96b546855b..ccc1f0a6b36 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -31,9 +31,11 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.Constants; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndexStats; @@ -46,6 +48,7 @@ import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.AllocationId; @@ -58,6 +61,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; @@ -118,6 +122,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -975,7 +980,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { newShard.updateRoutingEntry(routing, false); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); - assertTrue(newShard.recoverFromStore(localNode)); + assertTrue(newShard.recoverFromStore()); assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); @@ -1003,7 +1008,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); - assertTrue(newShard.recoverFromStore(localNode)); + assertTrue(newShard.recoverFromStore()); assertEquals(0, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart()); @@ -1037,7 +1042,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { newShard.updateRoutingEntry(routing, false); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); try { - newShard.recoverFromStore(localNode); + newShard.recoverFromStore(); fail("index not there!"); } catch (IndexShardRecoveryException ex) { assertTrue(ex.getMessage().contains("failed to fetch index version after copying it over")); @@ -1056,7 +1061,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { newShard = test.createShard(routing); newShard.updateRoutingEntry(routing, false); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); - assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(localNode)); + assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore()); newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted(), true); SearchResponse response = client().prepareSearch().get(); @@ -1142,7 +1147,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { @Override public void verify(String verificationToken) { } - }, localNode)); + })); test_target_shard.updateRoutingEntry(routing.moveToStarted(), true); assertHitCount(client().prepareSearch("test_target").get(), 1); @@ -1398,7 +1403,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { public static final IndexShard recoverShard(IndexShard newShard) throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), newShard.routingEntry().primary(), RecoveryState.Type.STORE, localNode, localNode)); - assertTrue(newShard.recoverFromStore(localNode)); + assertTrue(newShard.recoverFromStore()); newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted(), true); return newShard; } @@ -1507,4 +1512,83 @@ public class IndexShardTests extends ESSingleNodeTestCase { // Shard should now be active since we did recover: assertTrue(newShard.isActive()); } + + public void testRecoverFromLocalShard() throws IOException { + createIndex("index"); + createIndex("index_1"); + createIndex("index_2"); + client().admin().indices().preparePutMapping("index").setType("test").setSource(jsonBuilder().startObject() + .startObject("test") + .startObject("properties") + .startObject("foo") + .field("type", "text") + .endObject() + .endObject().endObject().endObject()).get(); + client().prepareIndex("index", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get(); + client().prepareIndex("index", "test", "1").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get(); + + + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService(resolveIndex("index_1")); + IndexShard shard = test.getShardOrNull(0); + ShardRouting routing = ShardRoutingHelper.initWithSameId(shard.routingEntry()); + test.removeShard(0, "b/c simon says so"); + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); + { + final IndexShard newShard = test.createShard(routing); + newShard.updateRoutingEntry(routing, false); + newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.LOCAL_SHARDS, localNode, localNode)); + + BiConsumer mappingConsumer = (type, mapping) -> { + try { + client().admin().indices().preparePutMapping().setConcreteIndex(newShard.indexSettings().getIndex()) + .setType(type) + .setSource(mapping.source().string()) + .get(); + } catch (IOException ex) { + throw new ElasticsearchException("failed to stringify mapping source", ex); + } + }; + expectThrows(IllegalArgumentException.class, () -> { + IndexService index = indicesService.indexService(resolveIndex("index")); + IndexService index_2 = indicesService.indexService(resolveIndex("index_2")); + newShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(index.getShard(0), index_2.getShard(0))); + }); + + IndexService indexService = indicesService.indexService(resolveIndex("index")); + assertTrue(newShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(indexService.getShard(0)))); + RecoveryState recoveryState = newShard.recoveryState(); + assertEquals(RecoveryState.Stage.DONE, recoveryState.getStage()); + assertTrue(recoveryState.getIndex().fileDetails().size() > 0); + for (RecoveryState.File file : recoveryState.getIndex().fileDetails()) { + if (file.reused()) { + assertEquals(file.recovered(), 0); + } else { + assertEquals(file.recovered(), file.length()); + } + } + routing = ShardRoutingHelper.moveToStarted(routing); + newShard.updateRoutingEntry(routing, true); + assertHitCount(client().prepareSearch("index_1").get(), 2); + } + // now check that it's persistent ie. that the added shards are committed + { + routing = shard.routingEntry(); + test.removeShard(0, "b/c simon says so"); + routing = ShardRoutingHelper.reinit(routing); + final IndexShard newShard = test.createShard(routing); + newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.LOCAL_SHARDS, localNode, localNode)); + assertTrue(newShard.recoverFromStore()); + routing = ShardRoutingHelper.moveToStarted(routing); + newShard.updateRoutingEntry(routing, true); + assertHitCount(client().prepareSearch("index_1").get(), 2); + } + + GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings("index_1").get(); + ImmutableOpenMap> mappings = mappingsResponse.getMappings(); + assertNotNull(mappings.get("index_1")); + assertNotNull(mappings.get("index_1").get("test")); + assertEquals(mappings.get("index_1").get("test").get().source().string(), "{\"test\":{\"properties\":{\"foo\":{\"type\":\"text\"}}}}"); + + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java b/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java new file mode 100644 index 00000000000..a46e0f7562d --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.Version; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.BasicFileAttributes; +import java.security.AccessControlException; +import java.util.Arrays; +import java.util.function.Predicate; + +public class StoreRecoveryTests extends ESTestCase { + + public void testAddIndices() throws IOException { + Directory[] dirs = new Directory[randomIntBetween(1, 10)]; + final int numDocs = randomIntBetween(50, 100); + int id = 0; + for (int i = 0; i < dirs.length; i++) { + dirs[i] = newFSDirectory(createTempDir()); + IndexWriter writer = new IndexWriter(dirs[i], newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.CREATE)); + for (int j = 0; j < numDocs; j++) { + writer.addDocument(Arrays.asList(new StringField("id", Integer.toString(id++), Field.Store.YES))); + } + + writer.commit(); + writer.close(); + } + StoreRecovery storeRecovery = new StoreRecovery(new ShardId("foo", "bar", 1), logger); + RecoveryState.Index indexStats = new RecoveryState.Index(); + Directory target = newFSDirectory(createTempDir()); + storeRecovery.addIndices(indexStats, target, dirs); + int numFiles = 0; + Predicate filesFilter = (f) -> f.startsWith("segments") == false && f.equals("write.lock") == false + && f.startsWith("extra") == false; + for (Directory d : dirs) { + numFiles += Arrays.asList(d.listAll()).stream().filter(filesFilter).count(); + } + final long targetNumFiles = Arrays.asList(target.listAll()).stream().filter(filesFilter).count(); + assertEquals(numFiles, targetNumFiles); + assertEquals(indexStats.totalFileCount(), targetNumFiles); + if (hardLinksSupported(createTempDir())) { + assertEquals("upgrade to HardlinkCopyDirectoryWrapper in Lucene 6.1", Version.LATEST, Version.LUCENE_6_0_0); + // assertEquals(indexStats.reusedFileCount(), targetNumFiles); -- uncomment this once upgraded to Lucene 6.1 + assertEquals(indexStats.reusedFileCount(), 0); + } else { + assertEquals(indexStats.reusedFileCount(), 0); + } + DirectoryReader reader = DirectoryReader.open(target); + SegmentInfos segmentCommitInfos = SegmentInfos.readLatestCommit(target); + for (SegmentCommitInfo info : segmentCommitInfos) { // check that we didn't merge + assertEquals("all sources must be flush", info.info.getDiagnostics().get("source"), "flush"); + } + assertEquals(reader.numDeletedDocs(), 0); + assertEquals(reader.numDocs(), id); + reader.close(); + target.close(); + IOUtils.close(dirs); + } + + public void testStatsDirWrapper() throws IOException { + Directory dir = newDirectory(); + Directory target = newDirectory(); + RecoveryState.Index indexStats = new RecoveryState.Index(); + StoreRecovery.StatsDirectoryWrapper wrapper = new StoreRecovery.StatsDirectoryWrapper(target, indexStats); + try (IndexOutput output = dir.createOutput("foo.bar", IOContext.DEFAULT)) { + CodecUtil.writeHeader(output, "foo", 0); + int numBytes = randomIntBetween(100, 20000); + for (int i = 0; i < numBytes; i++) { + output.writeByte((byte)i); + } + CodecUtil.writeFooter(output); + } + wrapper.copyFrom(dir, "foo.bar", "bar.foo", IOContext.DEFAULT); + assertNotNull(indexStats.getFileDetails("bar.foo")); + assertNull(indexStats.getFileDetails("foo.bar")); + assertEquals(dir.fileLength("foo.bar"), indexStats.getFileDetails("bar.foo").length()); + assertEquals(dir.fileLength("foo.bar"), indexStats.getFileDetails("bar.foo").recovered()); + assertFalse(indexStats.getFileDetails("bar.foo").reused()); + IOUtils.close(dir, target); + } + + public boolean hardLinksSupported(Path path) throws IOException { + try { + Files.createFile(path.resolve("foo.bar")); + Files.createLink(path.resolve("test"), path.resolve("foo.bar")); + BasicFileAttributes destAttr = Files.readAttributes(path.resolve("test"), BasicFileAttributes.class); + BasicFileAttributes sourceAttr = Files.readAttributes(path.resolve("foo.bar"), BasicFileAttributes.class); + // we won't get here - no permission ;) + return destAttr.fileKey() != null + && destAttr.fileKey().equals(sourceAttr.fileKey()); + } catch (AccessControlException ex) { + return true; // if we run into that situation we know it's supported. + } catch (UnsupportedOperationException ex) { + return false; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/index/store/StoreTests.java b/core/src/test/java/org/elasticsearch/index/store/StoreTests.java index 8f64390dbd3..be45b1ad330 100644 --- a/core/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/core/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -95,9 +95,7 @@ import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; public class StoreTests extends ESTestCase { diff --git a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index 4b11bb1bc96..2722fc9d9d3 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -453,7 +453,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); assertEquals(1, imc.availableShards().size()); - assertTrue(newShard.recoverFromStore(localNode)); + assertTrue(newShard.recoverFromStore()); assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1); newShard.updateRoutingEntry(routing.moveToStarted(), true); } finally { diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index bbb1e8bf2bf..8d59da7da01 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -106,7 +106,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas final DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); shard.markAsRecovering("store", new RecoveryState(shard.shardId(), newRouting.primary(), RecoveryState.Type.SNAPSHOT, newRouting.restoreSource(), localNode)); - shard.recoverFromStore(localNode); + shard.recoverFromStore(); newRouting = ShardRoutingHelper.moveToStarted(newRouting); shard.updateRoutingEntry(newRouting, true); } finally { diff --git a/docs/reference/indices.asciidoc b/docs/reference/indices.asciidoc index 7543a3f0bbb..86b8bfbf342 100644 --- a/docs/reference/indices.asciidoc +++ b/docs/reference/indices.asciidoc @@ -15,6 +15,7 @@ index settings, aliases, mappings, and index templates. * <> * <> * <> +* <> [float] [[mapping-management]] diff --git a/docs/reference/indices/shrink-index.asciidoc b/docs/reference/indices/shrink-index.asciidoc new file mode 100644 index 00000000000..28532db5240 --- /dev/null +++ b/docs/reference/indices/shrink-index.asciidoc @@ -0,0 +1,83 @@ +[[indices-shrink-index]] +== Shrink Index + +The shrink index API allows to shrink an existing index into a new index with a single shard. +In order to shrink an index, all its shards must be allocated on a single node in the cluster. +This is required since the shrink command will copy all shards index files into the target index +data folder when the primary of the target index is initially allocated. + +When an index is shrunk no write operations should happen to the source index. Elasticsearch will +enforce the `read-only` property when the shrink command is executed. All operations necessary to shrink the +source index are executed during initial primary recovery. Once the target index primary shard is started the +shrink operation has successfully finished. To monitor status and progress use <> + + +To shrink and index all shards of that index must be allocated on a single node. + +[source,js] +-------------------------------------------------- +$ curl -XPUT 'http://localhost:9200/logs/_settings' -d '{ + "settings" : { + "index.routing.allocation.require._name" : "shrink_node_name", <1> + "index.blocks.write" : true <2> + } +}' +-------------------------------------------------- +<1> Forces the relocation of all of the indices shards to the node `shrink_node_name` +<2> Prevents write operations to this index while still allowing metadata changes like deleting the index. + +The above second curl example shows how an index called `logs` can be +forced to allocate at least one copy of each shard on a specific node in the cluster. + +The `_shrink` API is similar to <> and accepts `settings` and `aliases` for the target index. + +[source,js] +-------------------------------------------------- +$ curl -XPUT 'http://localhost:9200/logs/_shrink/logs_single_shard' -d '{ + "settings" : { + "index.codec" : "best_compression", <1> + "index.number_of_replicas" : 0 <2> + } +}' +-------------------------------------------------- +<1> Enables `best_compression` codec on the target index +<2> Sets the number of replicas on the target index to `0` to ensure the cluster is green once the shard initialized + +The API call above returns immediately once the target index is created but doesn't wait +for the shrink operation to start. Once the target indices primary shard moves to state `initializing` +the shrink operation has started. + +Once the index is shrunk replicas can be set to `1` and allocation filtering can be removed. + +[source,js] +-------------------------------------------------- +$ curl -XPUT 'http://localhost:9200/logs_single_shard/_settings' -d '{ + "settings" : { + "index.routing.allocation.include._id" : null, <1> + "index.number_of_replicas" : 1 <2> + } +}' +-------------------------------------------------- + +<1> Resets the allocation filtering for the new shrunk index to allow replica allocation +<2> Bumps the number of replicas to 1 + + + +[float] +[[shrink-index-limitations]] +=== Limitations + +Indices can only be shrunk into a single shard if they fully the following requirements: + + * an instance of all of the indices shards must be allocated on a single node + * the index must not contain more than `2.14 billion` documents (`2147483519`) in total (sum of all shards) + This is the maximum shard size elasticsearch can support. + * the index must have more than one shard + * the index must be `read-only`, ie. have a cluster block set `index.blocks.write=true` + * the target index must not exist + * all `index.analysis.*` and `index.similarity.*` settings passed to the `_shrink` call will be overwritten with the + source indices settings. + * if the target index can't be allocated on the shrink node, due to throttling or other allocation deciders, + its primary shard will stay `unassigned` until it can be allocated on that node + diff --git a/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbMMapDirectoryTests.java b/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbMMapDirectoryTests.java index b368a3e99e7..3dad35adaa2 100644 --- a/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbMMapDirectoryTests.java +++ b/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbMMapDirectoryTests.java @@ -23,9 +23,8 @@ import java.io.IOException; import java.nio.file.Path; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MMapDirectory; -import org.elasticsearch.index.store.SmbDirectoryWrapper; -public class SmbMMapDirectoryTests extends ESBaseDirectoryTestCase { +public class SmbMMapDirectoryTests extends EsBaseDirectoryTestCase { @Override protected Directory getDirectory(Path file) throws IOException { diff --git a/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbSimpleFSDirectoryTests.java b/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbSimpleFSDirectoryTests.java index d18057a106b..659f6eff7ac 100644 --- a/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbSimpleFSDirectoryTests.java +++ b/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbSimpleFSDirectoryTests.java @@ -23,9 +23,8 @@ import java.io.IOException; import java.nio.file.Path; import org.apache.lucene.store.Directory; import org.apache.lucene.store.SimpleFSDirectory; -import org.elasticsearch.index.store.SmbDirectoryWrapper; -public class SmbSimpleFSDirectoryTests extends ESBaseDirectoryTestCase { +public class SmbSimpleFSDirectoryTests extends EsBaseDirectoryTestCase { @Override protected Directory getDirectory(Path file) throws IOException { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json new file mode 100644 index 00000000000..633e9e16093 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json @@ -0,0 +1,35 @@ +{ + "indices.shrink": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/indices-shrink-index.html", + "methods": ["PUT", "POST"], + "url": { + "path": "/{index}/_shrink/{target}", + "paths": ["/{index}/_shrink/{target}"], + "parts": { + "index": { + "type" : "string", + "required" : true, + "description" : "The name of the source index to shrink" + }, + "target": { + "type" : "string", + "required" : true, + "description" : "The name of the target index to shrink into" + } + }, + "params": { + "timeout": { + "type" : "time", + "description" : "Explicit operation timeout" + }, + "master_timeout": { + "type" : "time", + "description" : "Specify timeout for connection to master" + } + } + }, + "body": { + "description" : "The configuration for the target index (`settings` and `aliases`)" + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml new file mode 100644 index 00000000000..4ef428b9600 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml @@ -0,0 +1,72 @@ +--- +"Shrink index via API": + # creates an index with one document. + # relocates all it's shards to one node + # shrinks it into a new index with a single shard + - do: + indices.create: + index: source + body: + settings: + number_of_replicas: "0" + - do: + index: + index: source + type: test + id: "1" + body: { "foo": "hello world" } + + - do: + get: + index: source + type: test + id: "1" + + - match: { _index: source } + - match: { _type: test } + - match: { _id: "1" } + - match: { _source: { foo: "hello world" } } + + - do: + cluster.state: {} + + # Get master node id + - set: { master_node: master } + + # relocate everything to the master node and make it read-only + - do: + indices.put_settings: + index: source + body: + index.routing.allocation.include._id: $master + index.blocks.write: true + index.number_of_replicas: 0 + + - do: + cluster.health: + wait_for_status: green + index: source + wait_for_relocating_shards: 0 + + # now we do the actual shrink + - do: + indices.shrink: + index: "source" + target: "target" + body: + index.number_of_replicas: 0 + + - do: + cluster.health: + wait_for_status: green + + - do: + get: + index: target + type: test + id: "1" + + - match: { _index: target } + - match: { _type: test } + - match: { _id: "1" } + - match: { _source: { foo: "hello world" } } diff --git a/plugins/store-smb/src/test/java/org/elasticsearch/index/store/ESBaseDirectoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/store/EsBaseDirectoryTestCase.java similarity index 87% rename from plugins/store-smb/src/test/java/org/elasticsearch/index/store/ESBaseDirectoryTestCase.java rename to test/framework/src/main/java/org/elasticsearch/index/store/EsBaseDirectoryTestCase.java index 855ddce9ebd..638d24e7f9c 100644 --- a/plugins/store-smb/src/test/java/org/elasticsearch/index/store/ESBaseDirectoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/store/EsBaseDirectoryTestCase.java @@ -25,6 +25,8 @@ import org.apache.lucene.store.BaseDirectoryTestCase; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.TimeUnits; import org.elasticsearch.bootstrap.BootstrapForTesting; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter; /** @@ -36,8 +38,11 @@ import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter; @TimeoutSuite(millis = TimeUnits.HOUR) @LuceneTestCase.SuppressReproduceLine @LuceneTestCase.SuppressSysoutChecks(bugUrl = "we log a lot on purpose") -public abstract class ESBaseDirectoryTestCase extends BaseDirectoryTestCase { +public abstract class EsBaseDirectoryTestCase extends BaseDirectoryTestCase { static { BootstrapForTesting.ensureInitialized(); } + + protected final ESLogger logger = Loggers.getLogger(getClass()); + }