From 502a775a7c938aeda566566fa22416c5260cd858 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 31 May 2016 10:41:44 +0200 Subject: [PATCH] Add primitive to shrink an index into a single shard (#18270) This adds a low level primitive operations to shrink an existing index into a new index with a single shard. This primitive expects all shards of the source index to allocated on a single node. Once the target index is initializing on the shrink node it takes a snapshot of the source index shards and copies all files into the target indices data folder. An [optimization](https://issues.apache.org/jira/browse/LUCENE-7300) coming in Lucene 6.1 will also allow for optional constant time copy if hard-links are supported by the filesystem. All mappings are merged into the new indexes metadata once the snapshots have been taken on the merge node. To shrink an existing index all shards must be moved to a single node (one instance of each shard) and the index must be read-only: ```BASH $ curl -XPUT 'http://localhost:9200/logs/_settings' -d '{ "settings" : { "index.routing.allocation.require._name" : "shrink_node_name", "index.blocks.write" : true } } ``` once all shards are started on the shrink node. the new index can be created via: ```BASH $ curl -XPUT 'http://localhost:9200/logs/_shrink/logs_single_shard' -d '{ "settings" : { "index.codec" : "best_compression", "index.number_of_replicas" : 1 } }' ``` This API will perform all needed check before the new index is created and selects the shrink node based on the allocation of the source index. This call returns immediately, to monitor shrink progress the recovery API should be used since all copy operations are reflected in the recovery API with byte copy progress etc. The shrink operation does not modify the source index, if a shrink operation should be canceled or if the shrink failed, the target index can simply be deleted and all resources are released. --- .../elasticsearch/action/ActionModule.java | 3 + .../elasticsearch/action/IndicesRequest.java | 2 +- .../CreateIndexClusterStateUpdateRequest.java | 13 +- .../indices/create/CreateIndexResponse.java | 4 +- .../admin/indices/shrink/ShrinkAction.java | 45 +++++ .../admin/indices/shrink/ShrinkRequest.java | 107 ++++++++++ .../indices/shrink/ShrinkRequestBuilder.java | 47 +++++ .../admin/indices/shrink/ShrinkResponse.java | 31 +++ .../indices/shrink/TransportShrinkAction.java | 162 ++++++++++++++++ .../org/elasticsearch/bootstrap/ESPolicy.java | 6 +- .../client/IndicesAdminClient.java | 18 ++ .../client/support/AbstractClient.java | 20 ++ .../cluster/metadata/IndexMetaData.java | 8 + .../metadata/MetaDataCreateIndexService.java | 94 ++++++++- .../cluster/routing/RoutingNode.java | 2 + .../common/network/NetworkModule.java | 2 + .../elasticsearch/index/shard/IndexShard.java | 109 ++++++++--- .../index/shard/LocalShardSnapshot.java | 138 +++++++++++++ .../index/shard/StoreRecovery.java | 182 ++++++++++++++++-- .../org/elasticsearch/index/store/Store.java | 3 +- .../cluster/IndicesClusterStateService.java | 19 +- .../indices/recovery/RecoveryState.java | 9 +- .../admin/indices/RestShrinkIndexAction.java | 61 ++++++ .../snapshots/RestoreService.java | 2 +- .../admin/indices/create/CreateIndexIT.java | 102 ++++++++++ .../shrink/TransportShrinkActionTests.java | 136 +++++++++++++ .../MetaDataCreateIndexServiceTests.java | 166 ++++++++++++++++ .../cluster/routing/ShardRoutingHelper.java | 6 + .../index/shard/IndexShardTests.java | 96 ++++++++- .../index/shard/StoreRecoveryTests.java | 132 +++++++++++++ .../elasticsearch/index/store/StoreTests.java | 2 - .../IndexingMemoryControllerTests.java | 2 +- ...dicesLifecycleListenerSingleNodeTests.java | 2 +- docs/reference/indices.asciidoc | 1 + docs/reference/indices/shrink-index.asciidoc | 83 ++++++++ .../index/store/SmbMMapDirectoryTests.java | 3 +- .../store/SmbSimpleFSDirectoryTests.java | 3 +- .../rest-api-spec/api/indices.shrink.json | 35 ++++ .../test/indices.shrink/10_basic.yaml | 72 +++++++ .../index/store/EsBaseDirectoryTestCase.java | 7 +- 40 files changed, 1867 insertions(+), 68 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkAction.java create mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java create mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java create mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java create mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java create mode 100644 core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java create mode 100644 core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java create mode 100644 core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java create mode 100644 core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java create mode 100644 core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java create mode 100644 docs/reference/indices/shrink-index.asciidoc create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml rename plugins/store-smb/src/test/java/org/elasticsearch/index/store/ESBaseDirectoryTestCase.java => test/framework/src/main/java/org/elasticsearch/index/store/EsBaseDirectoryTestCase.java (87%) diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java index bab3dcb2ed2..7e975b922d4 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java @@ -115,6 +115,8 @@ import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettin import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction; import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction; +import org.elasticsearch.action.admin.indices.shrink.ShrinkAction; +import org.elasticsearch.action.admin.indices.shrink.TransportShrinkAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateAction; @@ -286,6 +288,7 @@ public class ActionModule extends AbstractModule { registerAction(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); registerAction(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class); registerAction(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class); + registerAction(ShrinkAction.INSTANCE, TransportShrinkAction.class); registerAction(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class); registerAction(GetIndexAction.INSTANCE, TransportGetIndexAction.class); registerAction(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class); diff --git a/core/src/main/java/org/elasticsearch/action/IndicesRequest.java b/core/src/main/java/org/elasticsearch/action/IndicesRequest.java index 4c62a7e849b..3ef699818b6 100644 --- a/core/src/main/java/org/elasticsearch/action/IndicesRequest.java +++ b/core/src/main/java/org/elasticsearch/action/IndicesRequest.java @@ -40,7 +40,7 @@ public interface IndicesRequest { */ IndicesOptions indicesOptions(); - static interface Replaceable extends IndicesRequest { + interface Replaceable extends IndicesRequest { /** * Sets the indices that the action relates to. */ diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java index 72fe0553f60..8d7e75b1626 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; import org.elasticsearch.transport.TransportMessage; import java.util.HashMap; @@ -40,6 +41,7 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ private final String cause; private final String index; private final boolean updateAllTypes; + private Index shrinkFrom; private IndexMetaData.State state = IndexMetaData.State.OPEN; @@ -54,7 +56,7 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ private final Set blocks = new HashSet<>(); - CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) { + public CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) { this.originalMessage = originalMessage; this.cause = cause; this.index = index; @@ -91,6 +93,11 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ return this; } + public CreateIndexClusterStateUpdateRequest shrinkFrom(Index shrinkFrom) { + this.shrinkFrom = shrinkFrom; + return this; + } + public TransportMessage originalMessage() { return originalMessage; } @@ -127,6 +134,10 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ return blocks; } + public Index shrinkFrom() { + return shrinkFrom; + } + /** True if all fields that span multiple types should be updated, false otherwise */ public boolean updateAllTypes() { return updateAllTypes; diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java index 485f0a53f69..b9282002349 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java @@ -30,10 +30,10 @@ import java.io.IOException; */ public class CreateIndexResponse extends AcknowledgedResponse { - CreateIndexResponse() { + protected CreateIndexResponse() { } - CreateIndexResponse(boolean acknowledged) { + protected CreateIndexResponse(boolean acknowledged) { super(acknowledged); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkAction.java new file mode 100644 index 00000000000..4c09241ad75 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkAction.java @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.shrink; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +/** + */ +public class ShrinkAction extends Action { + + public static final ShrinkAction INSTANCE = new ShrinkAction(); + public static final String NAME = "indices:admin/shrink"; + + private ShrinkAction() { + super(NAME); + } + + @Override + public ShrinkResponse newResponse() { + return new ShrinkResponse(); + } + + @Override + public ShrinkRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new ShrinkRequestBuilder(client, this); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java new file mode 100644 index 00000000000..069c27ce475 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java @@ -0,0 +1,107 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.shrink; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * Request class to shrink an index into a single shard + */ +public class ShrinkRequest extends AcknowledgedRequest implements IndicesRequest { + + private CreateIndexRequest shrinkIndexRequest; + private String sourceIndex; + + ShrinkRequest() {} + + public ShrinkRequest(String targetIndex, String sourceindex) { + this.shrinkIndexRequest = new CreateIndexRequest(targetIndex); + this.sourceIndex = sourceindex; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = shrinkIndexRequest == null ? null : shrinkIndexRequest.validate(); + if (sourceIndex == null) { + validationException = addValidationError("source index is missing", validationException); + } + if (shrinkIndexRequest == null) { + validationException = addValidationError("shrink index request is missing", validationException); + } + return validationException; + } + + public void setSourceIndex(String index) { + this.sourceIndex = index; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shrinkIndexRequest = new CreateIndexRequest(); + shrinkIndexRequest.readFrom(in); + sourceIndex = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shrinkIndexRequest.writeTo(out); + out.writeString(sourceIndex); + } + + @Override + public String[] indices() { + return new String[] {sourceIndex}; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.lenientExpandOpen(); + } + + public void setShrinkIndex(CreateIndexRequest shrinkIndexRequest) { + this.shrinkIndexRequest = Objects.requireNonNull(shrinkIndexRequest, "shrink index request must not be null"); + } + + /** + * Returns the {@link CreateIndexRequest} for the shrink index + */ + public CreateIndexRequest getShrinkIndexReqeust() { + return shrinkIndexRequest; + } + + /** + * Returns the source index name + */ + public String getSourceIndex() { + return sourceIndex; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java new file mode 100644 index 00000000000..a098215b750 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.shrink; + +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.settings.Settings; + +public class ShrinkRequestBuilder extends AcknowledgedRequestBuilder { + public ShrinkRequestBuilder(ElasticsearchClient client, ShrinkAction action) { + super(client, action, new ShrinkRequest()); + } + + + public ShrinkRequestBuilder setTargetIndex(CreateIndexRequest request) { + this.request.setShrinkIndex(request); + return this; + } + + public ShrinkRequestBuilder setSourceIndex(String index) { + this.request.setSourceIndex(index); + return this; + } + + public ShrinkRequestBuilder setSettings(Settings settings) { + this.request.getShrinkIndexReqeust().settings(settings); + return this; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java new file mode 100644 index 00000000000..4835471ae4c --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.shrink; + +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; + +public final class ShrinkResponse extends CreateIndexResponse { + ShrinkResponse() { + } + + ShrinkResponse(boolean acknowledged) { + super(acknowledged); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java new file mode 100644 index 00000000000..ca007e31216 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java @@ -0,0 +1,162 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.shrink; + +import org.apache.lucene.index.IndexWriter; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.indices.IndexAlreadyExistsException; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; + +/** + * Main class to initiate shrinking an index into a new index with a single shard + */ +public class TransportShrinkAction extends TransportMasterNodeAction { + + private final MetaDataCreateIndexService createIndexService; + private final Client client; + + @Inject + public TransportShrinkAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, MetaDataCreateIndexService createIndexService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client) { + super(settings, ShrinkAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, + ShrinkRequest::new); + this.createIndexService = createIndexService; + this.client = client; + } + + @Override + protected String executor() { + // we go async right away + return ThreadPool.Names.SAME; + } + + @Override + protected ShrinkResponse newResponse() { + return new ShrinkResponse(); + } + + @Override + protected ClusterBlockException checkBlock(ShrinkRequest request, ClusterState state) { + return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getShrinkIndexReqeust().index()); + } + + @Override + protected void masterOperation(final ShrinkRequest shrinkRequest, final ClusterState state, + final ActionListener listener) { + final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(shrinkRequest.getSourceIndex()); + client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(new ActionListener() { + @Override + public void onResponse(IndicesStatsResponse indicesStatsResponse) { + CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(shrinkRequest, state, + indicesStatsResponse.getTotal().getDocs(), indexNameExpressionResolver); + createIndexService.createIndex(updateRequest, new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse response) { + listener.onResponse(new ShrinkResponse(response.isAcknowledged())); + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof IndexAlreadyExistsException) { + logger.trace("[{}] failed to create shrink index", t, updateRequest.index()); + } else { + logger.debug("[{}] failed to create shrink index", t, updateRequest.index()); + } + listener.onFailure(t); + } + }); + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + + } + + // static for unittesting this method + static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final ShrinkRequest shrinkReqeust, final ClusterState state + , final DocsStats docsStats, IndexNameExpressionResolver indexNameExpressionResolver) { + final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(shrinkReqeust.getSourceIndex()); + final CreateIndexRequest targetIndex = shrinkReqeust.getShrinkIndexReqeust(); + final String targetIndexName = indexNameExpressionResolver.resolveDateMathExpression(targetIndex.index()); + final IndexMetaData metaData = state.metaData().index(sourceIndex); + final Settings targetIndexSettings = Settings.builder().put(targetIndex.settings()) + .normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX).build(); + long count = docsStats.getCount(); + if (count >= IndexWriter.MAX_DOCS) { + throw new IllegalStateException("Can't merge index with more than [" + IndexWriter.MAX_DOCS + + "] docs - too many documents"); + } + targetIndex.cause("shrink_index"); + targetIndex.settings(Settings.builder() + .put(targetIndexSettings) + // we can only shrink to 1 index so far! + .put("index.number_of_shards", 1) + ); + + return new CreateIndexClusterStateUpdateRequest(targetIndex, + "shrink_index", targetIndexName, true) + // mappings are updated on the node when merging in the shards, this prevents race-conditions since all mapping must be + // applied once we took the snapshot and if somebody fucks things up and switches the index read/write and adds docs we miss + // the mappings for everything is corrupted and hard to debug + .ackTimeout(targetIndex.timeout()) + .masterNodeTimeout(targetIndex.masterNodeTimeout()) + .settings(targetIndex.settings()) + .aliases(targetIndex.aliases()) + .customs(targetIndex.customs()) + .shrinkFrom(metaData.getIndex()); + } + +} diff --git a/core/src/main/java/org/elasticsearch/bootstrap/ESPolicy.java b/core/src/main/java/org/elasticsearch/bootstrap/ESPolicy.java index 1cd3a9ad57e..ddc25c88535 100644 --- a/core/src/main/java/org/elasticsearch/bootstrap/ESPolicy.java +++ b/core/src/main/java/org/elasticsearch/bootstrap/ESPolicy.java @@ -35,12 +35,12 @@ import java.util.Map; /** custom policy for union of static and dynamic permissions */ final class ESPolicy extends Policy { - + /** template policy file, the one used in tests */ static final String POLICY_RESOURCE = "security.policy"; /** limited policy for scripts */ static final String UNTRUSTED_RESOURCE = "untrusted.policy"; - + final Policy template; final Policy untrusted; final Policy system; @@ -60,7 +60,7 @@ final class ESPolicy extends Policy { } @Override @SuppressForbidden(reason = "fast equals check is desired") - public boolean implies(ProtectionDomain domain, Permission permission) { + public boolean implies(ProtectionDomain domain, Permission permission) { CodeSource codeSource = domain.getCodeSource(); // codesource can be null when reducing privileges via doPrivileged() if (codeSource == null) { diff --git a/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index a475ce15d4e..10275f22d66 100644 --- a/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -92,6 +92,9 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRespons import org.elasticsearch.action.admin.indices.shards.IndicesShardStoreRequestBuilder; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.elasticsearch.action.admin.indices.shrink.ShrinkRequest; +import org.elasticsearch.action.admin.indices.shrink.ShrinkRequestBuilder; +import org.elasticsearch.action.admin.indices.shrink.ShrinkResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; @@ -782,4 +785,19 @@ public interface IndicesAdminClient extends ElasticsearchClient { */ GetSettingsRequestBuilder prepareGetSettings(String... indices); + /** + * Shrinks an index using an explicit request allowing to specify the settings, mappings and aliases of the target index of the index. + */ + ShrinkRequestBuilder prepareShrinkIndex(String sourceIndex, String targetIndex); + + /** + * Shrinks an index using an explicit request allowing to specify the settings, mappings and aliases of the target index of the index. + */ + ActionFuture shrinkIndex(ShrinkRequest request); + + /** + * Shrinks an index using an explicit request allowing to specify the settings, mappings and aliases of the target index of the index. + */ + void shrinkIndex(ShrinkRequest request, ActionListener listener); + } diff --git a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java index e36bc4b8d77..6481bec1b83 100644 --- a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -212,6 +212,10 @@ import org.elasticsearch.action.admin.indices.shards.IndicesShardStoreRequestBui import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.elasticsearch.action.admin.indices.shrink.ShrinkAction; +import org.elasticsearch.action.admin.indices.shrink.ShrinkRequest; +import org.elasticsearch.action.admin.indices.shrink.ShrinkRequestBuilder; +import org.elasticsearch.action.admin.indices.shrink.ShrinkResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; @@ -1684,6 +1688,22 @@ public abstract class AbstractClient extends AbstractComponent implements Client return new GetSettingsRequestBuilder(this, GetSettingsAction.INSTANCE, indices); } + @Override + public ShrinkRequestBuilder prepareShrinkIndex(String sourceIndex, String targetIndex) { + return new ShrinkRequestBuilder(this, ShrinkAction.INSTANCE).setSourceIndex(sourceIndex) + .setTargetIndex(new CreateIndexRequest(targetIndex)); + } + + @Override + public ActionFuture shrinkIndex(ShrinkRequest request) { + return execute(ShrinkAction.INSTANCE, request); + } + + @Override + public void shrinkIndex(ShrinkRequest request, ActionListener listener) { + execute(ShrinkAction.INSTANCE, request, listener); + } + @Override public ActionFuture getSettings(GetSettingsRequest request) { return execute(GetSettingsAction.INSTANCE, request); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 52721411f46..3a1856c7481 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -383,6 +383,14 @@ public class IndexMetaData implements Diffable, FromXContentBuild return mappings.get(mappingType); } + public static final Setting INDEX_SHRINK_SOURCE_UUID = Setting.simpleString("index.shrink.source.uuid"); + public static final Setting INDEX_SHRINK_SOURCE_NAME = Setting.simpleString("index.shrink.source.name"); + + + public Index getMergeSourceIndex() { + return INDEX_SHRINK_SOURCE_UUID.exists(settings) ? new Index(INDEX_SHRINK_SOURCE_NAME.get(settings), INDEX_SHRINK_SOURCE_UUID.get(settings)) : null; + } + /** * Sometimes, the default mapping exists and an actual mapping is not created yet (introduced), * in this case, we want to return the default mapping in case it has some default mapping definitions. diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index c8c352fc46d..5532d4788ad 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.metadata; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.lucene.index.IndexWriter; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -31,11 +32,15 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData.Custom; import org.elasticsearch.cluster.metadata.IndexMetaData.State; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; @@ -55,17 +60,18 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.env.Environment; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.NodeServicesProvider; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexCreationException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; -import org.elasticsearch.script.ScriptService; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -80,6 +86,8 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; @@ -264,7 +272,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { templatesAliases.put(aliasMetaData.alias(), aliasMetaData); } } - Settings.Builder indexSettingsBuilder = Settings.builder(); // apply templates, here, in reverse order, since first ones are better matching for (int i = templates.size() - 1; i >= 0; i--) { @@ -293,6 +300,11 @@ public class MetaDataCreateIndexService extends AbstractComponent { } indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID()); + final Index shrinkFromIndex = request.shrinkFrom(); + if (shrinkFromIndex != null) { + prepareShrinkIndexSettings(currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex, + request.index()); + } Settings actualIndexSettings = indexSettingsBuilder.build(); @@ -481,4 +493,82 @@ public class MetaDataCreateIndexService extends AbstractComponent { return Regex.simpleMatch(template.template(), request.index()); } } + + /** + * Validates the settings and mappings for shrinking an index. + * @return the list of nodes at least one instance of the source index shards are allocated + */ + static List validateShrinkIndex(ClusterState state, String sourceIndex, + Set targetIndexMappingsTypes, String targetIndexName, + Settings targetIndexSettings) { + if (state.metaData().hasIndex(targetIndexName)) { + throw new IndexAlreadyExistsException(state.metaData().index(targetIndexName).getIndex()); + } + final IndexMetaData sourceMetaData = state.metaData().index(sourceIndex); + if (sourceMetaData == null) { + throw new IndexNotFoundException(sourceIndex); + } + // ensure index is read-only + if (state.blocks().indexBlocked(ClusterBlockLevel.WRITE, sourceIndex) == false) { + throw new IllegalStateException("index " + sourceIndex + " must be read-only to shrink index. use \"index.blocks.write=true\""); + } + + if (sourceMetaData.getNumberOfShards() == 1) { + throw new IllegalArgumentException("can't shrink an index with only one shard"); + } + + if ((targetIndexMappingsTypes.size() > 1 || + (targetIndexMappingsTypes.isEmpty() || targetIndexMappingsTypes.contains(MapperService.DEFAULT_MAPPING)) == false)) { + throw new IllegalArgumentException("mappings are not allowed when shrinking indices" + + ", all mappings are copied from the source index"); + } + if (IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings) + && IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings) > 1) { + throw new IllegalArgumentException("can not shrink index into more than one shard"); + } + + // now check that index is all on one node + final IndexRoutingTable table = state.routingTable().index(sourceIndex); + Map nodesToNumRouting = new HashMap<>(); + int numShards = sourceMetaData.getNumberOfShards(); + for (ShardRouting routing : table.shardsWithState(ShardRoutingState.STARTED)) { + nodesToNumRouting.computeIfAbsent(routing.currentNodeId(), (s) -> new AtomicInteger(0)).incrementAndGet(); + } + List nodesToAllocateOn = new ArrayList<>(); + for (Map.Entry entries : nodesToNumRouting.entrySet()) { + int numAllocations = entries.getValue().get(); + assert numAllocations <= numShards : "wait what? " + numAllocations + " is > than num shards " + numShards; + if (numAllocations == numShards) { + nodesToAllocateOn.add(entries.getKey()); + } + } + if (nodesToAllocateOn.isEmpty()) { + throw new IllegalStateException("index " + sourceIndex + + " must have all shards allocated on the same node to shrink index"); + } + return nodesToAllocateOn; + } + + static void prepareShrinkIndexSettings(ClusterState currentState, Set mappingKeys, Settings.Builder indexSettingsBuilder, Index shrinkFromIndex, String shrinkIntoName) { + final IndexMetaData sourceMetaData = currentState.metaData().index(shrinkFromIndex.getName()); + final List nodesToAllocateOn = validateShrinkIndex(currentState, shrinkFromIndex.getName(), + mappingKeys, shrinkIntoName, indexSettingsBuilder.build()); + final Predicate analysisSimilarityPredicate = (s) -> s.startsWith("index.similarity.") + || s.startsWith("index.analysis."); + indexSettingsBuilder + // we can only shrink to 1 index so far! + .put("index.number_of_shards", 1) + // we use "i.r.a.include" rather than "i.r.a.require" since it's allows one of the nodes holding an + // instanceof all shards. + .put("index.routing.allocation.include._id", + Strings.arrayToCommaDelimitedString(nodesToAllocateOn.toArray())) + // we only try once and then give up with a shrink index + .put("index.allocation.max_retries", 1) + // now copy all similarity / analysis settings - this overrides all settings from the user unless they + // wanna add extra settings + .put(sourceMetaData.getSettings().filter(analysisSimilarityPredicate)) + .put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME.getKey(), shrinkFromIndex.getName()) + .put(IndexMetaData.INDEX_SHRINK_SOURCE_UUID.getKey(), shrinkFromIndex.getUUID()); + } + } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index 8e099eb14f6..a57601abffc 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -19,12 +19,14 @@ package org.elasticsearch.cluster.routing; +import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.index.shard.ShardId; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; diff --git a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java index 201e5297511..fd9191fd713 100644 --- a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java +++ b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java @@ -72,6 +72,7 @@ import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestGetStoredSc import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestPutSearchTemplateAction; import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestPutStoredScriptAction; import org.elasticsearch.rest.action.admin.cluster.tasks.RestPendingClusterTasksAction; +import org.elasticsearch.rest.action.admin.indices.RestShrinkIndexAction; import org.elasticsearch.rest.action.admin.indices.alias.RestIndicesAliasesAction; import org.elasticsearch.rest.action.admin.indices.alias.delete.RestIndexDeleteAliasesAction; import org.elasticsearch.rest.action.admin.indices.alias.get.RestGetAliasesAction; @@ -209,6 +210,7 @@ public class NetworkModule extends AbstractModule { RestIndexPutAliasAction.class, RestIndicesAliasesAction.class, RestCreateIndexAction.class, + RestShrinkIndexAction.class, RestDeleteIndexAction.class, RestCloseIndexAction.class, RestOpenIndexAction.class, diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d4a147d475b..adcf36f694f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -35,8 +35,11 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RestoreSource; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.Booleans; @@ -55,7 +58,10 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.SuspendableRefContainer; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.cache.IndexCache; @@ -102,6 +108,7 @@ import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.index.warmer.ShardIndexWarmerService; import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.indices.IndexingMemoryController; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTargetService; @@ -125,6 +132,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; public class IndexShard extends AbstractIndexShardComponent { @@ -920,8 +928,7 @@ public class IndexShard extends AbstractIndexShardComponent { } else { openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; } - final EngineConfig config = newEngineConfig(openMode, translogConfig, cachingPolicy, - new IndexShardRecoveryPerformer(shardId, mapperService, logger)); + final EngineConfig config = newEngineConfig(openMode); // we disable deletes since we allow for operations to be executed against the shard while recovering // but we need to make sure we don't loose deletes until we are done recovering config.setEnableGcDeletes(false); @@ -1109,20 +1116,37 @@ public class IndexShard extends AbstractIndexShardComponent { return path; } - public boolean recoverFromStore(DiscoveryNode localNode) { + public boolean recoverFromLocalShards(BiConsumer mappingUpdateConsumer, List localShards) throws IOException { + final List snapshots = new ArrayList<>(); + try { + for (IndexShard shard : localShards) { + snapshots.add(new LocalShardSnapshot(shard)); + } + + // we are the first primary, recover from the gateway + // if its post api allocation, the index should exists + assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard"; + StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); + return storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots); + } finally { + IOUtils.close(snapshots); + } + } + + public boolean recoverFromStore() { // we are the first primary, recover from the gateway // if its post api allocation, the index should exists assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; boolean shouldExist = shardRouting.allocatedPostIndexCreate(indexSettings.getIndexMetaData()); StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - return storeRecovery.recoverFromStore(this, shouldExist, localNode); + return storeRecovery.recoverFromStore(this, shouldExist); } - public boolean restoreFromRepository(IndexShardRepository repository, DiscoveryNode localNode) { + public boolean restoreFromRepository(IndexShardRepository repository) { assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - return storeRecovery.recoverFromRepository(this, repository, localNode); + return storeRecovery.recoverFromRepository(this, repository); } /** @@ -1337,7 +1361,8 @@ public class IndexShard extends AbstractIndexShardComponent { } public void startRecovery(DiscoveryNode localNode, DiscoveryNode sourceNode, RecoveryTargetService recoveryTargetService, - RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService) { + RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, + BiConsumer mappingUpdateConsumer, IndicesService indicesService) { final RestoreSource restoreSource = shardRouting.restoreSource(); if (shardRouting.isPeerRecovery()) { @@ -1357,19 +1382,59 @@ public class IndexShard extends AbstractIndexShardComponent { } } else if (restoreSource == null) { // recover from filesystem store - final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(), - RecoveryState.Type.STORE, localNode, localNode); - markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread - threadPool.generic().execute(() -> { - try { - if (recoverFromStore(localNode)) { - recoveryListener.onRecoveryDone(recoveryState); - } - } catch (Throwable t) { - recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, t), true); - } - }); + IndexMetaData indexMetaData = indexSettings().getIndexMetaData(); + Index mergeSourceIndex = indexMetaData.getMergeSourceIndex(); + final boolean recoverFromLocalShards = mergeSourceIndex != null && shardRouting.allocatedPostIndexCreate(indexMetaData) == false && shardRouting.primary(); + final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(), + recoverFromLocalShards ? RecoveryState.Type.LOCAL_SHARDS : RecoveryState.Type.STORE, localNode, localNode); + if (recoverFromLocalShards) { + final List startedShards = new ArrayList<>(); + final IndexService sourceIndexService = indicesService.indexService(mergeSourceIndex); + final int numShards = sourceIndexService != null ? sourceIndexService.getIndexSettings().getNumberOfShards() : -1; + if (sourceIndexService != null) { + for (IndexShard shard : sourceIndexService) { + if (shard.state() == IndexShardState.STARTED) { + startedShards.add(shard); + } + } + } + if (numShards == startedShards.size()) { + markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread + threadPool.generic().execute(() -> { + try { + if (recoverFromLocalShards(mappingUpdateConsumer, startedShards)) { + recoveryListener.onRecoveryDone(recoveryState); + } + } catch (Throwable t) { + recoveryListener.onRecoveryFailure(recoveryState, + new RecoveryFailedException(shardId, localNode, localNode, t), true); + } + }); + } else { + final Throwable t; + if (numShards == -1) { + t = new IndexNotFoundException(mergeSourceIndex); + } else { + t = new IllegalStateException("not all shards from index " + mergeSourceIndex + + " are started yet, expected " + numShards + " found " + startedShards.size() + " can't recover shard " + + shardId()); + } + recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, localNode, localNode, t), true); + } + } else { + markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread + threadPool.generic().execute(() -> { + try { + if (recoverFromStore()) { + recoveryListener.onRecoveryDone(recoveryState); + } + } catch (Throwable t) { + recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, t), true); + } + + }); + } } else { // recover from a restore final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(), @@ -1378,7 +1443,7 @@ public class IndexShard extends AbstractIndexShardComponent { threadPool.generic().execute(() -> { try { final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository()); - if (restoreFromRepository(indexShardRepository, localNode)) { + if (restoreFromRepository(indexShardRepository)) { recoveryListener.onRecoveryDone(recoveryState); } } catch (Throwable first) { @@ -1461,7 +1526,8 @@ public class IndexShard extends AbstractIndexShardComponent { return mapperService.documentMapperWithAutoCreate(type); } - private final EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, TranslogConfig translogConfig, QueryCachingPolicy cachingPolicy, TranslogRecoveryPerformer translogRecoveryPerformer) { + private final EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { + final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger); return new EngineConfig(openMode, shardId, threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(), mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, @@ -1627,5 +1693,4 @@ public class IndexShard extends AbstractIndexShardComponent { IndexShard.this.delete(engine, engineDelete); } } - } diff --git a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java new file mode 100644 index 00000000000..25c59caef99 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java @@ -0,0 +1,138 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.NoLockFactory; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoveryState; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + */ +final class LocalShardSnapshot implements Closeable { + private final IndexShard shard; + private final Store store; + private final IndexCommit indexCommit; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public LocalShardSnapshot(IndexShard shard) { + this.shard = shard; + store = shard.store(); + store.incRef(); + boolean success = false; + try { + indexCommit = shard.snapshotIndex(true); + success = true; + } finally { + if (success == false) { + store.decRef(); + } + } + } + + Index getIndex() { + return shard.indexSettings().getIndex(); + } + + Directory getSnapshotDirectory() { + /* this directory will not be used for anything else but reading / copying files to another directory + * we prevent all write operations on this directory with UOE - nobody should close it either. */ + return new FilterDirectory(store.directory()) { + @Override + public String[] listAll() throws IOException { + Collection fileNames = indexCommit.getFileNames(); + final String[] fileNameArray = fileNames.toArray(new String[fileNames.size()]); + return fileNameArray; + } + + @Override + public void deleteFile(String name) throws IOException { + throw new UnsupportedOperationException("this directory is read-only"); + } + + @Override + public void sync(Collection names) throws IOException { + throw new UnsupportedOperationException("this directory is read-only"); + } + + @Override + public void renameFile(String source, String dest) throws IOException { + throw new UnsupportedOperationException("this directory is read-only"); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + throw new UnsupportedOperationException("this directory is read-only"); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { + throw new UnsupportedOperationException("this directory is read-only"); + } + + @Override + public Lock obtainLock(String name) throws IOException { + /* we do explicitly a no-lock instance since we hold an index commit from a SnapshotDeletionPolicy so we + * can we certain that nobody messes with the files on disk. We also hold a ref on the store which means + * no external source will delete files either.*/ + return NoLockFactory.INSTANCE.obtainLock(in, name); + } + + @Override + public void close() throws IOException { + throw new UnsupportedOperationException("nobody should close this directory wrapper"); + } + }; + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + try { + shard.releaseSnapshot(indexCommit); + } finally { + store.decRef(); + } + } + } + + ImmutableOpenMap getMappings() { + return shard.indexSettings.getIndexMetaData().getMappings(); + } + + @Override + public String toString() { + return "local_shard_snapshot:[" + shard.shardId() + " indexCommit: " + indexCommit + "]"; + } +} diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 574e68d8cc9..c7d6f35eff2 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -19,20 +19,27 @@ package org.elasticsearch.index.shard; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RestoreSource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.store.Store; @@ -40,6 +47,11 @@ import org.elasticsearch.indices.recovery.RecoveryState; import java.io.IOException; import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -63,12 +75,11 @@ final class StoreRecovery { * files / transaction logs. This * @param indexShard the index shard instance to recovery the shard into * @param indexShouldExists true iff the index should exist on disk ie. has the shard been allocated previously on the shards store. - * @param localNode the reference to the local node * @return true if the shard has been recovered successfully, false if the recovery * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. * @see Store */ - boolean recoverFromStore(final IndexShard indexShard, final boolean indexShouldExists, DiscoveryNode localNode) { + boolean recoverFromStore(final IndexShard indexShard, final boolean indexShouldExists) { if (canRecover(indexShard)) { if (indexShard.routingEntry().restoreSource() != null) { throw new IllegalStateException("can't recover - restore source is not null"); @@ -81,6 +92,141 @@ final class StoreRecovery { return false; } + boolean recoverFromLocalShards(BiConsumer mappingUpdateConsumer, final IndexShard indexShard, final List shards) throws IOException { + if (canRecover(indexShard)) { + assert indexShard.recoveryState().getType() == RecoveryState.Type.LOCAL_SHARDS : "invalid recovery type: " + indexShard.recoveryState().getType(); + if (indexShard.routingEntry().restoreSource() != null) { + throw new IllegalStateException("can't recover - restore source is not null"); + } + if (shards.isEmpty()) { + throw new IllegalArgumentException("shards must not be empty"); + } + Set indices = shards.stream().map((s) -> s.getIndex()).collect(Collectors.toSet()); + if (indices.size() > 1) { + throw new IllegalArgumentException("can't add shards from more than one index"); + } + for (ObjectObjectCursor mapping : shards.get(0).getMappings()) { + mappingUpdateConsumer.accept(mapping.key, mapping.value); + } + for (ObjectObjectCursor mapping : shards.get(0).getMappings()) { + indexShard.mapperService().merge(mapping.key,mapping.value.source(), MapperService.MergeReason.MAPPING_RECOVERY, true); + } + return executeRecovery(indexShard, () -> { + logger.debug("starting recovery from local shards {}", shards); + try { + final Directory directory = indexShard.store().directory(); // don't close this directory!! + addIndices(indexShard.recoveryState().getIndex(), directory, shards.stream().map(s -> s.getSnapshotDirectory()) + .collect(Collectors.toList()).toArray(new Directory[shards.size()])); + internalRecoverFromStore(indexShard, true); + // just trigger a merge to do housekeeping on the + // copied segments - we will also see them in stats etc. + indexShard.getEngine().forceMerge(false, -1, false, false, false); + } catch (IOException ex) { + throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex); + } + + }); + } + return false; + } + + final void addIndices(RecoveryState.Index indexRecoveryStats, Directory target, Directory... sources) throws IOException { + /* + * TODO: once we upgraded to Lucene 6.1 use HardlinkCopyDirectoryWrapper to enable hardlinks if possible and enable it + * in the security.policy: + * + * grant codeBase "${codebase.lucene-misc-6.1.0.jar}" { + * // needed to allow shard shrinking to use hard-links if possible via lucenes HardlinkCopyDirectoryWrapper + * permission java.nio.file.LinkPermission "hard"; + * }; + * target = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target); + */ + try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(target, indexRecoveryStats), + new IndexWriterConfig(null) + .setCommitOnClose(false) + // we don't want merges to happen here - we call maybe merge on the engine + // later once we stared it up otherwise we would need to wait for it here + // we also don't specify a codec here and merges should use the engines for this index + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.CREATE))) { + writer.addIndexes(sources); + writer.commit(); + } + } + + /** + * Directory wrapper that records copy process for recovery statistics + */ + static final class StatsDirectoryWrapper extends FilterDirectory { + private final RecoveryState.Index index; + + StatsDirectoryWrapper(Directory in, RecoveryState.Index indexRecoveryStats) { + super(in); + this.index = indexRecoveryStats; + } + + @Override + public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException { + final long l = from.fileLength(src); + final AtomicBoolean copies = new AtomicBoolean(false); + // here we wrap the index input form the source directory to report progress of file copy for the recovery stats. + // we increment the num bytes recovered in the readBytes method below, if users pull statistics they can see immediately + // how much has been recovered. + in.copyFrom(new FilterDirectory(from) { + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + index.addFileDetail(dest, l, false); + copies.set(true); + final IndexInput input = in.openInput(name, context); + return new IndexInput("StatsDirectoryWrapper(" + input.toString() + ")") { + @Override + public void close() throws IOException { + input.close(); + } + + @Override + public long getFilePointer() { + throw new UnsupportedOperationException("only straight copies are supported"); + } + + @Override + public void seek(long pos) throws IOException { + throw new UnsupportedOperationException("seeks are not supported"); + } + + @Override + public long length() { + return input.length(); + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + throw new UnsupportedOperationException("slices are not supported"); + } + + @Override + public byte readByte() throws IOException { + throw new UnsupportedOperationException("use a buffer if you wanna perform well"); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + // we rely on the fact that copyFrom uses a buffer + input.readBytes(b, offset, len); + index.addRecoveredBytesToFile(dest, len); + } + }; + } + }, src, dest, context); + if (copies.get() == false) { + index.addFileDetail(dest, l, true); // hardlinked - we treat it as reused since the file was already somewhat there + } else { + assert index.getFileDetails(dest) != null : "File [" + dest + "] has no file details"; + assert index.getFileDetails(dest).recovered() == l : index.getFileDetails(dest).toString(); + } + } + } + /** * Recovers an index from a given {@link IndexShardRepository}. This method restores a * previously created index snapshot into an existing initializing shard. @@ -89,7 +235,7 @@ final class StoreRecovery { * @return true if the shard has been recovered successfully, false if the recovery * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. */ - boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository, DiscoveryNode localNode) { + boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository) { if (canRecover(indexShard)) { final ShardRouting shardRouting = indexShard.routingEntry(); if (shardRouting.restoreSource() == null) { @@ -200,29 +346,28 @@ final class StoreRecovery { // it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling) // its a "new index create" API, we have to do something, so better to clean it than use same data logger.trace("cleaning existing shard, shouldn't exists"); - IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE)); - writer.close(); + Lucene.cleanLuceneIndex(store.directory()); } } } catch (Throwable e) { throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e); } recoveryState.getIndex().updateVersion(version); - // since we recover from local, just fill the files and size try { final RecoveryState.Index index = recoveryState.getIndex(); - if (si != null) { - final Directory directory = store.directory(); - for (String name : Lucene.files(si)) { - long length = directory.fileLength(name); - index.addFileDetail(name, length, true); - } + if (si != null && recoveryState.getType() == RecoveryState.Type.STORE) { + addRecoveredFileDetails(si, store, index); } } catch (IOException e) { logger.debug("failed to list file details", e); } - indexShard.performTranslogRecovery(indexShouldExists); + if (recoveryState.getType() == RecoveryState.Type.LOCAL_SHARDS) { + assert indexShouldExists; + indexShard.skipTranslogRecovery(); + } else { + indexShard.performTranslogRecovery(indexShouldExists); + } indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from shard_store"); } catch (EngineException | IOException e) { @@ -232,6 +377,14 @@ final class StoreRecovery { } } + private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException { + final Directory directory = store.directory(); + for (String name : Lucene.files(si)) { + long length = directory.fileLength(name); + index.addFileDetail(name, length, true); + } + } + /** * Restores shard from {@link RestoreSource} associated with this shard in routing table */ @@ -260,4 +413,5 @@ final class StoreRecovery { throw new IndexShardRestoreFailedException(shardId, "restore failed", t); } } + } diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index bb162880384..a720f5cb258 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -682,7 +682,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref return refCounter.refCount(); } - private static final class StoreDirectory extends FilterDirectory { + static final class StoreDirectory extends FilterDirectory { private final ESLogger deletesLogger; @@ -715,7 +715,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref public String toString() { return "store(" + in.toString() + ")"; } - } /** diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 6f9b1684b9e..23518d4b3c3 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -20,6 +20,7 @@ package org.elasticsearch.indices.cluster; import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -68,6 +69,8 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; @@ -473,7 +476,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { + try { + nodeServicesProvider.getClient().admin().indices().preparePutMapping() + .setConcreteIndex(indexService.index()) // concrete index - no name clash, it uses uuid + .setType(type) + .setSource(mapping.source().string()) + .get(); + } catch (IOException ex) { + throw new ElasticsearchException("failed to stringify mapping source", ex); + } + }, indicesService); } /** @@ -667,7 +679,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent(channel)); + } +} diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 5338f927005..f6e6c4aaed5 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -112,7 +112,7 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet; * method. *

* Individual shards are getting restored as part of normal recovery process in - * {@link IndexShard#restoreFromRepository(IndexShardRepository, DiscoveryNode)} )} + * {@link IndexShard#restoreFromRepository(IndexShardRepository)} )} * method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking * at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property. *

diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index 3b1040446e0..8ee38c86dc8 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -28,23 +28,31 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.node.service.NodeService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.junit.Ignore; import java.util.HashMap; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -277,4 +285,98 @@ public class CreateIndexIT extends ESIntegTestCase { internalCluster().fullRestart(); ensureGreen("test"); } + + public void testCreateShrinkIndex() { + internalCluster().ensureAtLeastNumDataNodes(2); + prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", randomIntBetween(2, 7))).get(); + for (int i = 0; i < 20; i++) { + client().prepareIndex("source", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get(); + } + ImmutableOpenMap dataNodes = client().admin().cluster().prepareState().get().getState().nodes() + .getDataNodes(); + assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); + DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class); + String mergeNode = discoveryNodes[0].getName(); + // relocate all shards to one node such that we can merge it. + client().admin().indices().prepareUpdateSettings("source") + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", mergeNode) + .put("index.blocks.write", true)).get(); + ensureGreen(); + // now merge source into a single shard index + assertAcked(client().admin().indices().prepareShrinkIndex("source", "target") + .setSettings(Settings.builder().put("index.number_of_replicas", 0).build()).get()); + ensureGreen(); + assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + // let it be allocated anywhere and bump replicas + client().admin().indices().prepareUpdateSettings("target") + .setSettings(Settings.builder() + .putNull("index.routing.allocation.include._id") + .put("index.number_of_replicas", 1)).get(); + ensureGreen(); + assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + + for (int i = 20; i < 40; i++) { + client().prepareIndex("target", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get(); + } + flushAndRefresh(); + assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 40); + assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + + } + /** + * Tests that we can manually recover from a failed allocation due to shards being moved away etc. + */ + public void testCreateShrinkIndexFails() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + prepareCreate("source").setSettings(Settings.builder().put(indexSettings()) + .put("number_of_shards", randomIntBetween(2, 7)) + .put("number_of_replicas", 0)).get(); + for (int i = 0; i < 20; i++) { + client().prepareIndex("source", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get(); + } + ImmutableOpenMap dataNodes = client().admin().cluster().prepareState().get().getState().nodes() + .getDataNodes(); + assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); + DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class); + String spareNode = discoveryNodes[0].getName(); + String mergeNode = discoveryNodes[1].getName(); + // relocate all shards to one node such that we can merge it. + client().admin().indices().prepareUpdateSettings("source") + .setSettings(Settings.builder().put("index.routing.allocation.require._name", mergeNode) + .put("index.blocks.write", true)).get(); + ensureGreen(); + // now merge source into a single shard index + client().admin().indices().prepareShrinkIndex("source", "target") + .setSettings(Settings.builder() + .put("index.routing.allocation.exclude._name", mergeNode) // we manually exclude the merge node to forcefully fuck it up + .put("index.number_of_replicas", 0) + .put("index.allocation.max_retries", 1).build()).get(); + + // now we move all shards away from the merge node + client().admin().indices().prepareUpdateSettings("source") + .setSettings(Settings.builder().put("index.routing.allocation.require._name", spareNode) + .put("index.blocks.write", true)).get(); + ensureGreen("source"); + + client().admin().indices().prepareUpdateSettings("target") // erase the forcefully fuckup! + .setSettings(Settings.builder().putNull("index.routing.allocation.exclude._name")).get(); + // wait until it fails + assertBusy(() -> { + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get(); + RoutingTable routingTables = clusterStateResponse.getState().routingTable(); + assertTrue(routingTables.index("target").shard(0).getShards().get(0).unassigned()); + assertEquals(UnassignedInfo.Reason.ALLOCATION_FAILED, + routingTables.index("target").shard(0).getShards().get(0).unassignedInfo().getReason()); + assertEquals(1, + routingTables.index("target").shard(0).getShards().get(0).unassignedInfo().getNumFailedAllocations()); + }); + client().admin().indices().prepareUpdateSettings("source") // now relocate them all to the right node + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", mergeNode)).get(); + ensureGreen("source"); + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); // kick off a retry and wait until it's done! + ensureGreen(); + assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java new file mode 100644 index 00000000000..50bf8715f19 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java @@ -0,0 +1,136 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.shrink; + +import org.apache.lucene.index.IndexWriter; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.indices.IndexAlreadyExistsException; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.gateway.NoopGatewayAllocator; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; + +import static java.util.Collections.emptyMap; + +public class TransportShrinkActionTests extends ESTestCase { + + private ClusterState createClusterState(String name, int numShards, int numReplicas, Settings settings) { + MetaData.Builder metaBuilder = MetaData.builder(); + IndexMetaData indexMetaData = IndexMetaData.builder(name).settings(settings(Version.CURRENT) + .put(settings)) + .numberOfShards(numShards).numberOfReplicas(numReplicas).build(); + metaBuilder.put(indexMetaData, false); + MetaData metaData = metaBuilder.build(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.addAsNew(metaData.index(name)); + + RoutingTable routingTable = routingTableBuilder.build(); + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData).routingTable(routingTable).blocks(ClusterBlocks.builder().addBlocks(indexMetaData)).build(); + return clusterState; + } + + public void testErrorCondition() { + ClusterState state = createClusterState("source", randomIntBetween(2, 100), randomIntBetween(0, 10), + Settings.builder().put("index.blocks.write", true).build()); + DocsStats stats = new DocsStats(randomIntBetween(0, IndexWriter.MAX_DOCS-1), randomIntBetween(1, 1000)); + + assertEquals("Can't merge index with more than [2147483519] docs - too many documents", + expectThrows(IllegalStateException.class, () -> + TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), state, + new DocsStats(Integer.MAX_VALUE, randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY)) + ).getMessage()); + + + // create one that won't fail + ClusterState clusterState = ClusterState.builder(createClusterState("source", randomIntBetween(2, 10), 0, + Settings.builder().put("index.blocks.write", true).build())).nodes(DiscoveryNodes.builder().put(newNode("node1"))) + .build(); + AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, + Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), + NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + + RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + // now we start the shard + routingTable = service.applyStartedShards(clusterState, + routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), clusterState, stats, + new IndexNameExpressionResolver(Settings.EMPTY)); + } + + public void testShrinkIndexSettings() { + String indexName = randomAsciiOfLength(10); + // create one that won't fail + ClusterState clusterState = ClusterState.builder(createClusterState(indexName, randomIntBetween(2, 10), 0, + Settings.builder() + .put("index.blocks.write", true) + .build())).nodes(DiscoveryNodes.builder().put(newNode("node1"))) + .build(); + AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, + Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), + NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + + RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + // now we start the shard + routingTable = service.applyStartedShards(clusterState, + routingTable.index(indexName).shardsWithState(ShardRoutingState.INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + DocsStats stats = new DocsStats(randomIntBetween(0, IndexWriter.MAX_DOCS-1), randomIntBetween(1, 1000)); + ShrinkRequest target = new ShrinkRequest("target", indexName); + CreateIndexClusterStateUpdateRequest request = TransportShrinkAction.prepareCreateIndexRequest( + target, clusterState, stats, new IndexNameExpressionResolver(Settings.EMPTY)); + assertNotNull(request.shrinkFrom()); + assertEquals(indexName, request.shrinkFrom().getName()); + assertEquals("1", request.settings().get("index.number_of_shards")); + assertEquals("shrink_index", request.cause()); + } + + private DiscoveryNode newNode(String nodeId) { + return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, emptyMap(), + Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA))), Version.CURRENT); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java new file mode 100644 index 00000000000..49d1da0ddec --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java @@ -0,0 +1,166 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.indices.IndexAlreadyExistsException; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.gateway.NoopGatewayAllocator; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; + +import static java.util.Collections.emptyMap; + +public class MetaDataCreateIndexServiceTests extends ESTestCase { + + private ClusterState createClusterState(String name, int numShards, int numReplicas, Settings settings) { + MetaData.Builder metaBuilder = MetaData.builder(); + IndexMetaData indexMetaData = IndexMetaData.builder(name).settings(settings(Version.CURRENT) + .put(settings)) + .numberOfShards(numShards).numberOfReplicas(numReplicas).build(); + metaBuilder.put(indexMetaData, false); + MetaData metaData = metaBuilder.build(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.addAsNew(metaData.index(name)); + + RoutingTable routingTable = routingTableBuilder.build(); + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData).routingTable(routingTable).blocks(ClusterBlocks.builder().addBlocks(indexMetaData)).build(); + return clusterState; + } + + public void testValidateShrinkIndex() { + ClusterState state = createClusterState("source", randomIntBetween(2, 100), randomIntBetween(0, 10), + Settings.builder().put("index.blocks.write", true).build()); + + assertEquals("index [source] already exists", + expectThrows(IndexAlreadyExistsException.class, () -> + MetaDataCreateIndexService.validateShrinkIndex(state, "target", Collections.emptySet(), "source", Settings.EMPTY) + ).getMessage()); + + assertEquals("no such index", + expectThrows(IndexNotFoundException.class, () -> + MetaDataCreateIndexService.validateShrinkIndex(state, "no such index", Collections.emptySet(), "target", Settings.EMPTY) + ).getMessage()); + + assertEquals("can't shrink an index with only one shard", + expectThrows(IllegalArgumentException.class, () -> + MetaDataCreateIndexService.validateShrinkIndex(createClusterState("source", 1, 0, + Settings.builder().put("index.blocks.write", true).build()), "source", Collections.emptySet(), + "target", Settings.EMPTY) + ).getMessage()); + + assertEquals("index source must be read-only to shrink index. use \"index.blocks.write=true\"", + expectThrows(IllegalStateException.class, () -> + MetaDataCreateIndexService.validateShrinkIndex( + createClusterState("source", randomIntBetween(2, 100), randomIntBetween(0, 10), Settings.EMPTY) + , "source", Collections.emptySet(), "target", Settings.EMPTY) + ).getMessage()); + + assertEquals("index source must have all shards allocated on the same node to shrink index", + expectThrows(IllegalStateException.class, () -> + MetaDataCreateIndexService.validateShrinkIndex(state, "source", Collections.emptySet(), "target", Settings.EMPTY) + + ).getMessage()); + + assertEquals("can not shrink index into more than one shard", + expectThrows(IllegalArgumentException.class, () -> + MetaDataCreateIndexService.validateShrinkIndex(state, "source", Collections.emptySet(), "target", + Settings.builder().put("index.number_of_shards", 2).build()) + ).getMessage()); + + assertEquals("mappings are not allowed when shrinking indices, all mappings are copied from the source index", + expectThrows(IllegalArgumentException.class, () -> { + MetaDataCreateIndexService.validateShrinkIndex(state, "source", Collections.singleton("foo"), + "target", Settings.EMPTY); + } + ).getMessage()); + + // create one that won't fail + ClusterState clusterState = ClusterState.builder(createClusterState("source", randomIntBetween(2, 10), 0, + Settings.builder().put("index.blocks.write", true).build())).nodes(DiscoveryNodes.builder().put(newNode("node1"))) + .build(); + AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, + Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), + NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + + RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + // now we start the shard + routingTable = service.applyStartedShards(clusterState, + routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + MetaDataCreateIndexService.validateShrinkIndex(clusterState, "source", Collections.emptySet(), "target", Settings.EMPTY); + } + + public void testShrinkIndexSettings() { + String indexName = randomAsciiOfLength(10); + // create one that won't fail + ClusterState clusterState = ClusterState.builder(createClusterState(indexName, randomIntBetween(2, 10), 0, + Settings.builder() + .put("index.blocks.write", true) + .put("index.similarity.default.type", "BM25") + .put("index.analysis.analyzer.my_analyzer.tokenizer", "keyword") + .build())).nodes(DiscoveryNodes.builder().put(newNode("node1"))) + .build(); + AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, + Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), + NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + + RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + // now we start the shard + routingTable = service.applyStartedShards(clusterState, + routingTable.index(indexName).shardsWithState(ShardRoutingState.INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + Settings.Builder builder = Settings.builder(); + MetaDataCreateIndexService.prepareShrinkIndexSettings( + clusterState, Collections.emptySet(), builder, clusterState.metaData().index(indexName).getIndex(), "target"); + assertEquals("1", builder.build().get("index.number_of_shards")); + assertEquals("similarity settings must be copied", "BM25", builder.build().get("index.similarity.default.type")); + assertEquals("analysis settings must be copied", + "keyword", builder.build().get("index.analysis.analyzer.my_analyzer.tokenizer")); + assertEquals("node1", builder.build().get("index.routing.allocation.include._id")); + assertEquals("1", builder.build().get("index.allocation.max_retries")); + } + + private DiscoveryNode newNode(String nodeId) { + return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, emptyMap(), + Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA))), Version.CURRENT); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java index 81d5c45c39b..707bfd9057e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java @@ -52,6 +52,12 @@ public class ShardRoutingHelper { return routing.reinitializeShard().updateUnassignedInfo(new UnassignedInfo(reason, "test_reinit")); } + public static ShardRouting initWithSameId(ShardRouting copy) { + return new ShardRouting(copy.shardId(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), + copy.primary(), ShardRoutingState.INITIALIZING, new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, null), + copy.allocationId(), copy.getExpectedShardSize()); + } + public static ShardRouting moveToUnassigned(ShardRouting routing, UnassignedInfo info) { return routing.moveToUnassigned(info); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index f96b546855b..ccc1f0a6b36 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -31,9 +31,11 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.Constants; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndexStats; @@ -46,6 +48,7 @@ import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.AllocationId; @@ -58,6 +61,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; @@ -118,6 +122,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -975,7 +980,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { newShard.updateRoutingEntry(routing, false); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); - assertTrue(newShard.recoverFromStore(localNode)); + assertTrue(newShard.recoverFromStore()); assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); @@ -1003,7 +1008,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); - assertTrue(newShard.recoverFromStore(localNode)); + assertTrue(newShard.recoverFromStore()); assertEquals(0, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart()); @@ -1037,7 +1042,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { newShard.updateRoutingEntry(routing, false); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); try { - newShard.recoverFromStore(localNode); + newShard.recoverFromStore(); fail("index not there!"); } catch (IndexShardRecoveryException ex) { assertTrue(ex.getMessage().contains("failed to fetch index version after copying it over")); @@ -1056,7 +1061,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { newShard = test.createShard(routing); newShard.updateRoutingEntry(routing, false); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); - assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(localNode)); + assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore()); newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted(), true); SearchResponse response = client().prepareSearch().get(); @@ -1142,7 +1147,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { @Override public void verify(String verificationToken) { } - }, localNode)); + })); test_target_shard.updateRoutingEntry(routing.moveToStarted(), true); assertHitCount(client().prepareSearch("test_target").get(), 1); @@ -1398,7 +1403,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { public static final IndexShard recoverShard(IndexShard newShard) throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), newShard.routingEntry().primary(), RecoveryState.Type.STORE, localNode, localNode)); - assertTrue(newShard.recoverFromStore(localNode)); + assertTrue(newShard.recoverFromStore()); newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted(), true); return newShard; } @@ -1507,4 +1512,83 @@ public class IndexShardTests extends ESSingleNodeTestCase { // Shard should now be active since we did recover: assertTrue(newShard.isActive()); } + + public void testRecoverFromLocalShard() throws IOException { + createIndex("index"); + createIndex("index_1"); + createIndex("index_2"); + client().admin().indices().preparePutMapping("index").setType("test").setSource(jsonBuilder().startObject() + .startObject("test") + .startObject("properties") + .startObject("foo") + .field("type", "text") + .endObject() + .endObject().endObject().endObject()).get(); + client().prepareIndex("index", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get(); + client().prepareIndex("index", "test", "1").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get(); + + + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService(resolveIndex("index_1")); + IndexShard shard = test.getShardOrNull(0); + ShardRouting routing = ShardRoutingHelper.initWithSameId(shard.routingEntry()); + test.removeShard(0, "b/c simon says so"); + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); + { + final IndexShard newShard = test.createShard(routing); + newShard.updateRoutingEntry(routing, false); + newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.LOCAL_SHARDS, localNode, localNode)); + + BiConsumer mappingConsumer = (type, mapping) -> { + try { + client().admin().indices().preparePutMapping().setConcreteIndex(newShard.indexSettings().getIndex()) + .setType(type) + .setSource(mapping.source().string()) + .get(); + } catch (IOException ex) { + throw new ElasticsearchException("failed to stringify mapping source", ex); + } + }; + expectThrows(IllegalArgumentException.class, () -> { + IndexService index = indicesService.indexService(resolveIndex("index")); + IndexService index_2 = indicesService.indexService(resolveIndex("index_2")); + newShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(index.getShard(0), index_2.getShard(0))); + }); + + IndexService indexService = indicesService.indexService(resolveIndex("index")); + assertTrue(newShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(indexService.getShard(0)))); + RecoveryState recoveryState = newShard.recoveryState(); + assertEquals(RecoveryState.Stage.DONE, recoveryState.getStage()); + assertTrue(recoveryState.getIndex().fileDetails().size() > 0); + for (RecoveryState.File file : recoveryState.getIndex().fileDetails()) { + if (file.reused()) { + assertEquals(file.recovered(), 0); + } else { + assertEquals(file.recovered(), file.length()); + } + } + routing = ShardRoutingHelper.moveToStarted(routing); + newShard.updateRoutingEntry(routing, true); + assertHitCount(client().prepareSearch("index_1").get(), 2); + } + // now check that it's persistent ie. that the added shards are committed + { + routing = shard.routingEntry(); + test.removeShard(0, "b/c simon says so"); + routing = ShardRoutingHelper.reinit(routing); + final IndexShard newShard = test.createShard(routing); + newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.LOCAL_SHARDS, localNode, localNode)); + assertTrue(newShard.recoverFromStore()); + routing = ShardRoutingHelper.moveToStarted(routing); + newShard.updateRoutingEntry(routing, true); + assertHitCount(client().prepareSearch("index_1").get(), 2); + } + + GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings("index_1").get(); + ImmutableOpenMap> mappings = mappingsResponse.getMappings(); + assertNotNull(mappings.get("index_1")); + assertNotNull(mappings.get("index_1").get("test")); + assertEquals(mappings.get("index_1").get("test").get().source().string(), "{\"test\":{\"properties\":{\"foo\":{\"type\":\"text\"}}}}"); + + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java b/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java new file mode 100644 index 00000000000..a46e0f7562d --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.Version; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.BasicFileAttributes; +import java.security.AccessControlException; +import java.util.Arrays; +import java.util.function.Predicate; + +public class StoreRecoveryTests extends ESTestCase { + + public void testAddIndices() throws IOException { + Directory[] dirs = new Directory[randomIntBetween(1, 10)]; + final int numDocs = randomIntBetween(50, 100); + int id = 0; + for (int i = 0; i < dirs.length; i++) { + dirs[i] = newFSDirectory(createTempDir()); + IndexWriter writer = new IndexWriter(dirs[i], newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.CREATE)); + for (int j = 0; j < numDocs; j++) { + writer.addDocument(Arrays.asList(new StringField("id", Integer.toString(id++), Field.Store.YES))); + } + + writer.commit(); + writer.close(); + } + StoreRecovery storeRecovery = new StoreRecovery(new ShardId("foo", "bar", 1), logger); + RecoveryState.Index indexStats = new RecoveryState.Index(); + Directory target = newFSDirectory(createTempDir()); + storeRecovery.addIndices(indexStats, target, dirs); + int numFiles = 0; + Predicate filesFilter = (f) -> f.startsWith("segments") == false && f.equals("write.lock") == false + && f.startsWith("extra") == false; + for (Directory d : dirs) { + numFiles += Arrays.asList(d.listAll()).stream().filter(filesFilter).count(); + } + final long targetNumFiles = Arrays.asList(target.listAll()).stream().filter(filesFilter).count(); + assertEquals(numFiles, targetNumFiles); + assertEquals(indexStats.totalFileCount(), targetNumFiles); + if (hardLinksSupported(createTempDir())) { + assertEquals("upgrade to HardlinkCopyDirectoryWrapper in Lucene 6.1", Version.LATEST, Version.LUCENE_6_0_0); + // assertEquals(indexStats.reusedFileCount(), targetNumFiles); -- uncomment this once upgraded to Lucene 6.1 + assertEquals(indexStats.reusedFileCount(), 0); + } else { + assertEquals(indexStats.reusedFileCount(), 0); + } + DirectoryReader reader = DirectoryReader.open(target); + SegmentInfos segmentCommitInfos = SegmentInfos.readLatestCommit(target); + for (SegmentCommitInfo info : segmentCommitInfos) { // check that we didn't merge + assertEquals("all sources must be flush", info.info.getDiagnostics().get("source"), "flush"); + } + assertEquals(reader.numDeletedDocs(), 0); + assertEquals(reader.numDocs(), id); + reader.close(); + target.close(); + IOUtils.close(dirs); + } + + public void testStatsDirWrapper() throws IOException { + Directory dir = newDirectory(); + Directory target = newDirectory(); + RecoveryState.Index indexStats = new RecoveryState.Index(); + StoreRecovery.StatsDirectoryWrapper wrapper = new StoreRecovery.StatsDirectoryWrapper(target, indexStats); + try (IndexOutput output = dir.createOutput("foo.bar", IOContext.DEFAULT)) { + CodecUtil.writeHeader(output, "foo", 0); + int numBytes = randomIntBetween(100, 20000); + for (int i = 0; i < numBytes; i++) { + output.writeByte((byte)i); + } + CodecUtil.writeFooter(output); + } + wrapper.copyFrom(dir, "foo.bar", "bar.foo", IOContext.DEFAULT); + assertNotNull(indexStats.getFileDetails("bar.foo")); + assertNull(indexStats.getFileDetails("foo.bar")); + assertEquals(dir.fileLength("foo.bar"), indexStats.getFileDetails("bar.foo").length()); + assertEquals(dir.fileLength("foo.bar"), indexStats.getFileDetails("bar.foo").recovered()); + assertFalse(indexStats.getFileDetails("bar.foo").reused()); + IOUtils.close(dir, target); + } + + public boolean hardLinksSupported(Path path) throws IOException { + try { + Files.createFile(path.resolve("foo.bar")); + Files.createLink(path.resolve("test"), path.resolve("foo.bar")); + BasicFileAttributes destAttr = Files.readAttributes(path.resolve("test"), BasicFileAttributes.class); + BasicFileAttributes sourceAttr = Files.readAttributes(path.resolve("foo.bar"), BasicFileAttributes.class); + // we won't get here - no permission ;) + return destAttr.fileKey() != null + && destAttr.fileKey().equals(sourceAttr.fileKey()); + } catch (AccessControlException ex) { + return true; // if we run into that situation we know it's supported. + } catch (UnsupportedOperationException ex) { + return false; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/index/store/StoreTests.java b/core/src/test/java/org/elasticsearch/index/store/StoreTests.java index 8f64390dbd3..be45b1ad330 100644 --- a/core/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/core/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -95,9 +95,7 @@ import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; public class StoreTests extends ESTestCase { diff --git a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index 4b11bb1bc96..2722fc9d9d3 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -453,7 +453,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); assertEquals(1, imc.availableShards().size()); - assertTrue(newShard.recoverFromStore(localNode)); + assertTrue(newShard.recoverFromStore()); assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1); newShard.updateRoutingEntry(routing.moveToStarted(), true); } finally { diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index bbb1e8bf2bf..8d59da7da01 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -106,7 +106,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas final DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); shard.markAsRecovering("store", new RecoveryState(shard.shardId(), newRouting.primary(), RecoveryState.Type.SNAPSHOT, newRouting.restoreSource(), localNode)); - shard.recoverFromStore(localNode); + shard.recoverFromStore(); newRouting = ShardRoutingHelper.moveToStarted(newRouting); shard.updateRoutingEntry(newRouting, true); } finally { diff --git a/docs/reference/indices.asciidoc b/docs/reference/indices.asciidoc index 7543a3f0bbb..86b8bfbf342 100644 --- a/docs/reference/indices.asciidoc +++ b/docs/reference/indices.asciidoc @@ -15,6 +15,7 @@ index settings, aliases, mappings, and index templates. * <> * <> * <> +* <> [float] [[mapping-management]] diff --git a/docs/reference/indices/shrink-index.asciidoc b/docs/reference/indices/shrink-index.asciidoc new file mode 100644 index 00000000000..28532db5240 --- /dev/null +++ b/docs/reference/indices/shrink-index.asciidoc @@ -0,0 +1,83 @@ +[[indices-shrink-index]] +== Shrink Index + +The shrink index API allows to shrink an existing index into a new index with a single shard. +In order to shrink an index, all its shards must be allocated on a single node in the cluster. +This is required since the shrink command will copy all shards index files into the target index +data folder when the primary of the target index is initially allocated. + +When an index is shrunk no write operations should happen to the source index. Elasticsearch will +enforce the `read-only` property when the shrink command is executed. All operations necessary to shrink the +source index are executed during initial primary recovery. Once the target index primary shard is started the +shrink operation has successfully finished. To monitor status and progress use <> + + +To shrink and index all shards of that index must be allocated on a single node. + +[source,js] +-------------------------------------------------- +$ curl -XPUT 'http://localhost:9200/logs/_settings' -d '{ + "settings" : { + "index.routing.allocation.require._name" : "shrink_node_name", <1> + "index.blocks.write" : true <2> + } +}' +-------------------------------------------------- +<1> Forces the relocation of all of the indices shards to the node `shrink_node_name` +<2> Prevents write operations to this index while still allowing metadata changes like deleting the index. + +The above second curl example shows how an index called `logs` can be +forced to allocate at least one copy of each shard on a specific node in the cluster. + +The `_shrink` API is similar to <> and accepts `settings` and `aliases` for the target index. + +[source,js] +-------------------------------------------------- +$ curl -XPUT 'http://localhost:9200/logs/_shrink/logs_single_shard' -d '{ + "settings" : { + "index.codec" : "best_compression", <1> + "index.number_of_replicas" : 0 <2> + } +}' +-------------------------------------------------- +<1> Enables `best_compression` codec on the target index +<2> Sets the number of replicas on the target index to `0` to ensure the cluster is green once the shard initialized + +The API call above returns immediately once the target index is created but doesn't wait +for the shrink operation to start. Once the target indices primary shard moves to state `initializing` +the shrink operation has started. + +Once the index is shrunk replicas can be set to `1` and allocation filtering can be removed. + +[source,js] +-------------------------------------------------- +$ curl -XPUT 'http://localhost:9200/logs_single_shard/_settings' -d '{ + "settings" : { + "index.routing.allocation.include._id" : null, <1> + "index.number_of_replicas" : 1 <2> + } +}' +-------------------------------------------------- + +<1> Resets the allocation filtering for the new shrunk index to allow replica allocation +<2> Bumps the number of replicas to 1 + + + +[float] +[[shrink-index-limitations]] +=== Limitations + +Indices can only be shrunk into a single shard if they fully the following requirements: + + * an instance of all of the indices shards must be allocated on a single node + * the index must not contain more than `2.14 billion` documents (`2147483519`) in total (sum of all shards) + This is the maximum shard size elasticsearch can support. + * the index must have more than one shard + * the index must be `read-only`, ie. have a cluster block set `index.blocks.write=true` + * the target index must not exist + * all `index.analysis.*` and `index.similarity.*` settings passed to the `_shrink` call will be overwritten with the + source indices settings. + * if the target index can't be allocated on the shrink node, due to throttling or other allocation deciders, + its primary shard will stay `unassigned` until it can be allocated on that node + diff --git a/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbMMapDirectoryTests.java b/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbMMapDirectoryTests.java index b368a3e99e7..3dad35adaa2 100644 --- a/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbMMapDirectoryTests.java +++ b/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbMMapDirectoryTests.java @@ -23,9 +23,8 @@ import java.io.IOException; import java.nio.file.Path; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MMapDirectory; -import org.elasticsearch.index.store.SmbDirectoryWrapper; -public class SmbMMapDirectoryTests extends ESBaseDirectoryTestCase { +public class SmbMMapDirectoryTests extends EsBaseDirectoryTestCase { @Override protected Directory getDirectory(Path file) throws IOException { diff --git a/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbSimpleFSDirectoryTests.java b/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbSimpleFSDirectoryTests.java index d18057a106b..659f6eff7ac 100644 --- a/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbSimpleFSDirectoryTests.java +++ b/plugins/store-smb/src/test/java/org/elasticsearch/index/store/SmbSimpleFSDirectoryTests.java @@ -23,9 +23,8 @@ import java.io.IOException; import java.nio.file.Path; import org.apache.lucene.store.Directory; import org.apache.lucene.store.SimpleFSDirectory; -import org.elasticsearch.index.store.SmbDirectoryWrapper; -public class SmbSimpleFSDirectoryTests extends ESBaseDirectoryTestCase { +public class SmbSimpleFSDirectoryTests extends EsBaseDirectoryTestCase { @Override protected Directory getDirectory(Path file) throws IOException { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json new file mode 100644 index 00000000000..633e9e16093 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json @@ -0,0 +1,35 @@ +{ + "indices.shrink": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/indices-shrink-index.html", + "methods": ["PUT", "POST"], + "url": { + "path": "/{index}/_shrink/{target}", + "paths": ["/{index}/_shrink/{target}"], + "parts": { + "index": { + "type" : "string", + "required" : true, + "description" : "The name of the source index to shrink" + }, + "target": { + "type" : "string", + "required" : true, + "description" : "The name of the target index to shrink into" + } + }, + "params": { + "timeout": { + "type" : "time", + "description" : "Explicit operation timeout" + }, + "master_timeout": { + "type" : "time", + "description" : "Specify timeout for connection to master" + } + } + }, + "body": { + "description" : "The configuration for the target index (`settings` and `aliases`)" + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml new file mode 100644 index 00000000000..4ef428b9600 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml @@ -0,0 +1,72 @@ +--- +"Shrink index via API": + # creates an index with one document. + # relocates all it's shards to one node + # shrinks it into a new index with a single shard + - do: + indices.create: + index: source + body: + settings: + number_of_replicas: "0" + - do: + index: + index: source + type: test + id: "1" + body: { "foo": "hello world" } + + - do: + get: + index: source + type: test + id: "1" + + - match: { _index: source } + - match: { _type: test } + - match: { _id: "1" } + - match: { _source: { foo: "hello world" } } + + - do: + cluster.state: {} + + # Get master node id + - set: { master_node: master } + + # relocate everything to the master node and make it read-only + - do: + indices.put_settings: + index: source + body: + index.routing.allocation.include._id: $master + index.blocks.write: true + index.number_of_replicas: 0 + + - do: + cluster.health: + wait_for_status: green + index: source + wait_for_relocating_shards: 0 + + # now we do the actual shrink + - do: + indices.shrink: + index: "source" + target: "target" + body: + index.number_of_replicas: 0 + + - do: + cluster.health: + wait_for_status: green + + - do: + get: + index: target + type: test + id: "1" + + - match: { _index: target } + - match: { _type: test } + - match: { _id: "1" } + - match: { _source: { foo: "hello world" } } diff --git a/plugins/store-smb/src/test/java/org/elasticsearch/index/store/ESBaseDirectoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/store/EsBaseDirectoryTestCase.java similarity index 87% rename from plugins/store-smb/src/test/java/org/elasticsearch/index/store/ESBaseDirectoryTestCase.java rename to test/framework/src/main/java/org/elasticsearch/index/store/EsBaseDirectoryTestCase.java index 855ddce9ebd..638d24e7f9c 100644 --- a/plugins/store-smb/src/test/java/org/elasticsearch/index/store/ESBaseDirectoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/store/EsBaseDirectoryTestCase.java @@ -25,6 +25,8 @@ import org.apache.lucene.store.BaseDirectoryTestCase; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.TimeUnits; import org.elasticsearch.bootstrap.BootstrapForTesting; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter; /** @@ -36,8 +38,11 @@ import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter; @TimeoutSuite(millis = TimeUnits.HOUR) @LuceneTestCase.SuppressReproduceLine @LuceneTestCase.SuppressSysoutChecks(bugUrl = "we log a lot on purpose") -public abstract class ESBaseDirectoryTestCase extends BaseDirectoryTestCase { +public abstract class EsBaseDirectoryTestCase extends BaseDirectoryTestCase { static { BootstrapForTesting.ensureInitialized(); } + + protected final ESLogger logger = Loggers.getLogger(getClass()); + }