Add primitive to shrink an index into a single shard (#18270)

This adds a low level primitive operations to shrink an existing
index into a new index with a single shard. This primitive expects
all shards of the source index to allocated on a single node. Once the target index is initializing on the shrink node it takes a snapshot of the source index shards and copies all files into the target indices data folder. An [optimization](https://issues.apache.org/jira/browse/LUCENE-7300) coming in Lucene 6.1 will also allow for optional constant time copy if hard-links are supported by the filesystem. All mappings are merged into the new indexes metadata once the snapshots have been taken on the merge node.

To shrink an existing index all shards must be moved to a single node (one instance of each shard) and the index must be read-only:

```BASH
$ curl -XPUT 'http://localhost:9200/logs/_settings' -d '{
    "settings" : {
        "index.routing.allocation.require._name" : "shrink_node_name",
        "index.blocks.write" : true 
    }
}
```
once all shards are started on the shrink node. the new index can be created via:

```BASH
$ curl -XPUT 'http://localhost:9200/logs/_shrink/logs_single_shard' -d '{
    "settings" : {
        "index.codec" : "best_compression",
        "index.number_of_replicas" : 1
    }
}'
```

This API will perform all needed check before the new index is created and selects the shrink node based on the allocation of the source index. This call returns immediately, to monitor shrink progress the recovery API should be used since all copy operations are reflected in the recovery API with byte copy progress etc.

The shrink operation does not modify the source index, if a shrink operation should
be canceled or if the shrink failed, the target index can simply be deleted and
all resources are released.
This commit is contained in:
Simon Willnauer 2016-05-31 10:41:44 +02:00
parent c911f4d951
commit 502a775a7c
40 changed files with 1867 additions and 68 deletions

View File

@ -115,6 +115,8 @@ import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettin
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shrink.ShrinkAction;
import org.elasticsearch.action.admin.indices.shrink.TransportShrinkAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateAction;
@ -286,6 +288,7 @@ public class ActionModule extends AbstractModule {
registerAction(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
registerAction(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
registerAction(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
registerAction(ShrinkAction.INSTANCE, TransportShrinkAction.class);
registerAction(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
registerAction(GetIndexAction.INSTANCE, TransportGetIndexAction.class);
registerAction(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class);

View File

@ -40,7 +40,7 @@ public interface IndicesRequest {
*/
IndicesOptions indicesOptions();
static interface Replaceable extends IndicesRequest {
interface Replaceable extends IndicesRequest {
/**
* Sets the indices that the action relates to.
*/

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.transport.TransportMessage;
import java.util.HashMap;
@ -40,6 +41,7 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
private final String cause;
private final String index;
private final boolean updateAllTypes;
private Index shrinkFrom;
private IndexMetaData.State state = IndexMetaData.State.OPEN;
@ -54,7 +56,7 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
private final Set<ClusterBlock> blocks = new HashSet<>();
CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) {
public CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) {
this.originalMessage = originalMessage;
this.cause = cause;
this.index = index;
@ -91,6 +93,11 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
return this;
}
public CreateIndexClusterStateUpdateRequest shrinkFrom(Index shrinkFrom) {
this.shrinkFrom = shrinkFrom;
return this;
}
public TransportMessage originalMessage() {
return originalMessage;
}
@ -127,6 +134,10 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
return blocks;
}
public Index shrinkFrom() {
return shrinkFrom;
}
/** True if all fields that span multiple types should be updated, false otherwise */
public boolean updateAllTypes() {
return updateAllTypes;

View File

@ -30,10 +30,10 @@ import java.io.IOException;
*/
public class CreateIndexResponse extends AcknowledgedResponse {
CreateIndexResponse() {
protected CreateIndexResponse() {
}
CreateIndexResponse(boolean acknowledged) {
protected CreateIndexResponse(boolean acknowledged) {
super(acknowledged);
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.shrink;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
/**
*/
public class ShrinkAction extends Action<ShrinkRequest, ShrinkResponse, ShrinkRequestBuilder> {
public static final ShrinkAction INSTANCE = new ShrinkAction();
public static final String NAME = "indices:admin/shrink";
private ShrinkAction() {
super(NAME);
}
@Override
public ShrinkResponse newResponse() {
return new ShrinkResponse();
}
@Override
public ShrinkRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new ShrinkRequestBuilder(client, this);
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.shrink;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* Request class to shrink an index into a single shard
*/
public class ShrinkRequest extends AcknowledgedRequest<ShrinkRequest> implements IndicesRequest {
private CreateIndexRequest shrinkIndexRequest;
private String sourceIndex;
ShrinkRequest() {}
public ShrinkRequest(String targetIndex, String sourceindex) {
this.shrinkIndexRequest = new CreateIndexRequest(targetIndex);
this.sourceIndex = sourceindex;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = shrinkIndexRequest == null ? null : shrinkIndexRequest.validate();
if (sourceIndex == null) {
validationException = addValidationError("source index is missing", validationException);
}
if (shrinkIndexRequest == null) {
validationException = addValidationError("shrink index request is missing", validationException);
}
return validationException;
}
public void setSourceIndex(String index) {
this.sourceIndex = index;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shrinkIndexRequest = new CreateIndexRequest();
shrinkIndexRequest.readFrom(in);
sourceIndex = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shrinkIndexRequest.writeTo(out);
out.writeString(sourceIndex);
}
@Override
public String[] indices() {
return new String[] {sourceIndex};
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.lenientExpandOpen();
}
public void setShrinkIndex(CreateIndexRequest shrinkIndexRequest) {
this.shrinkIndexRequest = Objects.requireNonNull(shrinkIndexRequest, "shrink index request must not be null");
}
/**
* Returns the {@link CreateIndexRequest} for the shrink index
*/
public CreateIndexRequest getShrinkIndexReqeust() {
return shrinkIndexRequest;
}
/**
* Returns the source index name
*/
public String getSourceIndex() {
return sourceIndex;
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.shrink;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
public class ShrinkRequestBuilder extends AcknowledgedRequestBuilder<ShrinkRequest, ShrinkResponse,
ShrinkRequestBuilder> {
public ShrinkRequestBuilder(ElasticsearchClient client, ShrinkAction action) {
super(client, action, new ShrinkRequest());
}
public ShrinkRequestBuilder setTargetIndex(CreateIndexRequest request) {
this.request.setShrinkIndex(request);
return this;
}
public ShrinkRequestBuilder setSourceIndex(String index) {
this.request.setSourceIndex(index);
return this;
}
public ShrinkRequestBuilder setSettings(Settings settings) {
this.request.getShrinkIndexReqeust().settings(settings);
return this;
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.shrink;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
public final class ShrinkResponse extends CreateIndexResponse {
ShrinkResponse() {
}
ShrinkResponse(boolean acknowledged) {
super(acknowledged);
}
}

View File

@ -0,0 +1,162 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.shrink;
import org.apache.lucene.index.IndexWriter;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
/**
* Main class to initiate shrinking an index into a new index with a single shard
*/
public class TransportShrinkAction extends TransportMasterNodeAction<ShrinkRequest, ShrinkResponse> {
private final MetaDataCreateIndexService createIndexService;
private final Client client;
@Inject
public TransportShrinkAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataCreateIndexService createIndexService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client) {
super(settings, ShrinkAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
ShrinkRequest::new);
this.createIndexService = createIndexService;
this.client = client;
}
@Override
protected String executor() {
// we go async right away
return ThreadPool.Names.SAME;
}
@Override
protected ShrinkResponse newResponse() {
return new ShrinkResponse();
}
@Override
protected ClusterBlockException checkBlock(ShrinkRequest request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getShrinkIndexReqeust().index());
}
@Override
protected void masterOperation(final ShrinkRequest shrinkRequest, final ClusterState state,
final ActionListener<ShrinkResponse> listener) {
final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(shrinkRequest.getSourceIndex());
client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(shrinkRequest, state,
indicesStatsResponse.getTotal().getDocs(), indexNameExpressionResolver);
createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new ShrinkResponse(response.isAcknowledged()));
}
@Override
public void onFailure(Throwable t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create shrink index", t, updateRequest.index());
} else {
logger.debug("[{}] failed to create shrink index", t, updateRequest.index());
}
listener.onFailure(t);
}
});
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
}
// static for unittesting this method
static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final ShrinkRequest shrinkReqeust, final ClusterState state
, final DocsStats docsStats, IndexNameExpressionResolver indexNameExpressionResolver) {
final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(shrinkReqeust.getSourceIndex());
final CreateIndexRequest targetIndex = shrinkReqeust.getShrinkIndexReqeust();
final String targetIndexName = indexNameExpressionResolver.resolveDateMathExpression(targetIndex.index());
final IndexMetaData metaData = state.metaData().index(sourceIndex);
final Settings targetIndexSettings = Settings.builder().put(targetIndex.settings())
.normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX).build();
long count = docsStats.getCount();
if (count >= IndexWriter.MAX_DOCS) {
throw new IllegalStateException("Can't merge index with more than [" + IndexWriter.MAX_DOCS
+ "] docs - too many documents");
}
targetIndex.cause("shrink_index");
targetIndex.settings(Settings.builder()
.put(targetIndexSettings)
// we can only shrink to 1 index so far!
.put("index.number_of_shards", 1)
);
return new CreateIndexClusterStateUpdateRequest(targetIndex,
"shrink_index", targetIndexName, true)
// mappings are updated on the node when merging in the shards, this prevents race-conditions since all mapping must be
// applied once we took the snapshot and if somebody fucks things up and switches the index read/write and adds docs we miss
// the mappings for everything is corrupted and hard to debug
.ackTimeout(targetIndex.timeout())
.masterNodeTimeout(targetIndex.masterNodeTimeout())
.settings(targetIndex.settings())
.aliases(targetIndex.aliases())
.customs(targetIndex.customs())
.shrinkFrom(metaData.getIndex());
}
}

View File

@ -35,12 +35,12 @@ import java.util.Map;
/** custom policy for union of static and dynamic permissions */
final class ESPolicy extends Policy {
/** template policy file, the one used in tests */
static final String POLICY_RESOURCE = "security.policy";
/** limited policy for scripts */
static final String UNTRUSTED_RESOURCE = "untrusted.policy";
final Policy template;
final Policy untrusted;
final Policy system;
@ -60,7 +60,7 @@ final class ESPolicy extends Policy {
}
@Override @SuppressForbidden(reason = "fast equals check is desired")
public boolean implies(ProtectionDomain domain, Permission permission) {
public boolean implies(ProtectionDomain domain, Permission permission) {
CodeSource codeSource = domain.getCodeSource();
// codesource can be null when reducing privileges via doPrivileged()
if (codeSource == null) {

View File

@ -92,6 +92,9 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRespons
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoreRequestBuilder;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.admin.indices.shrink.ShrinkRequest;
import org.elasticsearch.action.admin.indices.shrink.ShrinkRequestBuilder;
import org.elasticsearch.action.admin.indices.shrink.ShrinkResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
@ -782,4 +785,19 @@ public interface IndicesAdminClient extends ElasticsearchClient {
*/
GetSettingsRequestBuilder prepareGetSettings(String... indices);
/**
* Shrinks an index using an explicit request allowing to specify the settings, mappings and aliases of the target index of the index.
*/
ShrinkRequestBuilder prepareShrinkIndex(String sourceIndex, String targetIndex);
/**
* Shrinks an index using an explicit request allowing to specify the settings, mappings and aliases of the target index of the index.
*/
ActionFuture<ShrinkResponse> shrinkIndex(ShrinkRequest request);
/**
* Shrinks an index using an explicit request allowing to specify the settings, mappings and aliases of the target index of the index.
*/
void shrinkIndex(ShrinkRequest request, ActionListener<ShrinkResponse> listener);
}

View File

@ -212,6 +212,10 @@ import org.elasticsearch.action.admin.indices.shards.IndicesShardStoreRequestBui
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.admin.indices.shrink.ShrinkAction;
import org.elasticsearch.action.admin.indices.shrink.ShrinkRequest;
import org.elasticsearch.action.admin.indices.shrink.ShrinkRequestBuilder;
import org.elasticsearch.action.admin.indices.shrink.ShrinkResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
@ -1684,6 +1688,22 @@ public abstract class AbstractClient extends AbstractComponent implements Client
return new GetSettingsRequestBuilder(this, GetSettingsAction.INSTANCE, indices);
}
@Override
public ShrinkRequestBuilder prepareShrinkIndex(String sourceIndex, String targetIndex) {
return new ShrinkRequestBuilder(this, ShrinkAction.INSTANCE).setSourceIndex(sourceIndex)
.setTargetIndex(new CreateIndexRequest(targetIndex));
}
@Override
public ActionFuture<ShrinkResponse> shrinkIndex(ShrinkRequest request) {
return execute(ShrinkAction.INSTANCE, request);
}
@Override
public void shrinkIndex(ShrinkRequest request, ActionListener<ShrinkResponse> listener) {
execute(ShrinkAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<GetSettingsResponse> getSettings(GetSettingsRequest request) {
return execute(GetSettingsAction.INSTANCE, request);

View File

@ -383,6 +383,14 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
return mappings.get(mappingType);
}
public static final Setting<String> INDEX_SHRINK_SOURCE_UUID = Setting.simpleString("index.shrink.source.uuid");
public static final Setting<String> INDEX_SHRINK_SOURCE_NAME = Setting.simpleString("index.shrink.source.name");
public Index getMergeSourceIndex() {
return INDEX_SHRINK_SOURCE_UUID.exists(settings) ? new Index(INDEX_SHRINK_SOURCE_NAME.get(settings), INDEX_SHRINK_SOURCE_UUID.get(settings)) : null;
}
/**
* Sometimes, the default mapping exists and an actual mapping is not created yet (introduced),
* in this case, we want to return the default mapping in case it has some default mapping definitions.

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.metadata;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
@ -31,11 +32,15 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData.Custom;
import org.elasticsearch.cluster.metadata.IndexMetaData.State;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
@ -55,17 +60,18 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndexCreationException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.script.ScriptService;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -80,6 +86,8 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
@ -264,7 +272,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
templatesAliases.put(aliasMetaData.alias(), aliasMetaData);
}
}
Settings.Builder indexSettingsBuilder = Settings.builder();
// apply templates, here, in reverse order, since first ones are better matching
for (int i = templates.size() - 1; i >= 0; i--) {
@ -293,6 +300,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
}
indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
final Index shrinkFromIndex = request.shrinkFrom();
if (shrinkFromIndex != null) {
prepareShrinkIndexSettings(currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex,
request.index());
}
Settings actualIndexSettings = indexSettingsBuilder.build();
@ -481,4 +493,82 @@ public class MetaDataCreateIndexService extends AbstractComponent {
return Regex.simpleMatch(template.template(), request.index());
}
}
/**
* Validates the settings and mappings for shrinking an index.
* @return the list of nodes at least one instance of the source index shards are allocated
*/
static List<String> validateShrinkIndex(ClusterState state, String sourceIndex,
Set<String> targetIndexMappingsTypes, String targetIndexName,
Settings targetIndexSettings) {
if (state.metaData().hasIndex(targetIndexName)) {
throw new IndexAlreadyExistsException(state.metaData().index(targetIndexName).getIndex());
}
final IndexMetaData sourceMetaData = state.metaData().index(sourceIndex);
if (sourceMetaData == null) {
throw new IndexNotFoundException(sourceIndex);
}
// ensure index is read-only
if (state.blocks().indexBlocked(ClusterBlockLevel.WRITE, sourceIndex) == false) {
throw new IllegalStateException("index " + sourceIndex + " must be read-only to shrink index. use \"index.blocks.write=true\"");
}
if (sourceMetaData.getNumberOfShards() == 1) {
throw new IllegalArgumentException("can't shrink an index with only one shard");
}
if ((targetIndexMappingsTypes.size() > 1 ||
(targetIndexMappingsTypes.isEmpty() || targetIndexMappingsTypes.contains(MapperService.DEFAULT_MAPPING)) == false)) {
throw new IllegalArgumentException("mappings are not allowed when shrinking indices" +
", all mappings are copied from the source index");
}
if (IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings)
&& IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings) > 1) {
throw new IllegalArgumentException("can not shrink index into more than one shard");
}
// now check that index is all on one node
final IndexRoutingTable table = state.routingTable().index(sourceIndex);
Map<String, AtomicInteger> nodesToNumRouting = new HashMap<>();
int numShards = sourceMetaData.getNumberOfShards();
for (ShardRouting routing : table.shardsWithState(ShardRoutingState.STARTED)) {
nodesToNumRouting.computeIfAbsent(routing.currentNodeId(), (s) -> new AtomicInteger(0)).incrementAndGet();
}
List<String> nodesToAllocateOn = new ArrayList<>();
for (Map.Entry<String, AtomicInteger> entries : nodesToNumRouting.entrySet()) {
int numAllocations = entries.getValue().get();
assert numAllocations <= numShards : "wait what? " + numAllocations + " is > than num shards " + numShards;
if (numAllocations == numShards) {
nodesToAllocateOn.add(entries.getKey());
}
}
if (nodesToAllocateOn.isEmpty()) {
throw new IllegalStateException("index " + sourceIndex +
" must have all shards allocated on the same node to shrink index");
}
return nodesToAllocateOn;
}
static void prepareShrinkIndexSettings(ClusterState currentState, Set<String> mappingKeys, Settings.Builder indexSettingsBuilder, Index shrinkFromIndex, String shrinkIntoName) {
final IndexMetaData sourceMetaData = currentState.metaData().index(shrinkFromIndex.getName());
final List<String> nodesToAllocateOn = validateShrinkIndex(currentState, shrinkFromIndex.getName(),
mappingKeys, shrinkIntoName, indexSettingsBuilder.build());
final Predicate<String> analysisSimilarityPredicate = (s) -> s.startsWith("index.similarity.")
|| s.startsWith("index.analysis.");
indexSettingsBuilder
// we can only shrink to 1 index so far!
.put("index.number_of_shards", 1)
// we use "i.r.a.include" rather than "i.r.a.require" since it's allows one of the nodes holding an
// instanceof all shards.
.put("index.routing.allocation.include._id",
Strings.arrayToCommaDelimitedString(nodesToAllocateOn.toArray()))
// we only try once and then give up with a shrink index
.put("index.allocation.max_retries", 1)
// now copy all similarity / analysis settings - this overrides all settings from the user unless they
// wanna add extra settings
.put(sourceMetaData.getSettings().filter(analysisSimilarityPredicate))
.put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME.getKey(), shrinkFromIndex.getName())
.put(IndexMetaData.INDEX_SHRINK_SOURCE_UUID.getKey(), shrinkFromIndex.getUUID());
}
}

View File

@ -19,12 +19,14 @@
package org.elasticsearch.cluster.routing;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.shard.ShardId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;

View File

@ -72,6 +72,7 @@ import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestGetStoredSc
import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestPutSearchTemplateAction;
import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestPutStoredScriptAction;
import org.elasticsearch.rest.action.admin.cluster.tasks.RestPendingClusterTasksAction;
import org.elasticsearch.rest.action.admin.indices.RestShrinkIndexAction;
import org.elasticsearch.rest.action.admin.indices.alias.RestIndicesAliasesAction;
import org.elasticsearch.rest.action.admin.indices.alias.delete.RestIndexDeleteAliasesAction;
import org.elasticsearch.rest.action.admin.indices.alias.get.RestGetAliasesAction;
@ -209,6 +210,7 @@ public class NetworkModule extends AbstractModule {
RestIndexPutAliasAction.class,
RestIndicesAliasesAction.class,
RestCreateIndexAction.class,
RestShrinkIndexAction.class,
RestDeleteIndexAction.class,
RestCloseIndexAction.class,
RestOpenIndexAction.class,

View File

@ -35,8 +35,11 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Booleans;
@ -55,7 +58,10 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.SuspendableRefContainer;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.IndexCache;
@ -102,6 +108,7 @@ import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
@ -125,6 +132,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
public class IndexShard extends AbstractIndexShardComponent {
@ -920,8 +928,7 @@ public class IndexShard extends AbstractIndexShardComponent {
} else {
openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
}
final EngineConfig config = newEngineConfig(openMode, translogConfig, cachingPolicy,
new IndexShardRecoveryPerformer(shardId, mapperService, logger));
final EngineConfig config = newEngineConfig(openMode);
// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
config.setEnableGcDeletes(false);
@ -1109,20 +1116,37 @@ public class IndexShard extends AbstractIndexShardComponent {
return path;
}
public boolean recoverFromStore(DiscoveryNode localNode) {
public boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, List<IndexShard> localShards) throws IOException {
final List<LocalShardSnapshot> snapshots = new ArrayList<>();
try {
for (IndexShard shard : localShards) {
snapshots.add(new LocalShardSnapshot(shard));
}
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots);
} finally {
IOUtils.close(snapshots);
}
}
public boolean recoverFromStore() {
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
boolean shouldExist = shardRouting.allocatedPostIndexCreate(indexSettings.getIndexMetaData());
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromStore(this, shouldExist, localNode);
return storeRecovery.recoverFromStore(this, shouldExist);
}
public boolean restoreFromRepository(IndexShardRepository repository, DiscoveryNode localNode) {
public boolean restoreFromRepository(IndexShardRepository repository) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromRepository(this, repository, localNode);
return storeRecovery.recoverFromRepository(this, repository);
}
/**
@ -1337,7 +1361,8 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public void startRecovery(DiscoveryNode localNode, DiscoveryNode sourceNode, RecoveryTargetService recoveryTargetService,
RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService) {
RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
BiConsumer<String, MappingMetaData> mappingUpdateConsumer, IndicesService indicesService) {
final RestoreSource restoreSource = shardRouting.restoreSource();
if (shardRouting.isPeerRecovery()) {
@ -1357,19 +1382,59 @@ public class IndexShard extends AbstractIndexShardComponent {
}
} else if (restoreSource == null) {
// recover from filesystem store
final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(),
RecoveryState.Type.STORE, localNode, localNode);
markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (recoverFromStore(localNode)) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Throwable t) {
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, t), true);
}
});
IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
Index mergeSourceIndex = indexMetaData.getMergeSourceIndex();
final boolean recoverFromLocalShards = mergeSourceIndex != null && shardRouting.allocatedPostIndexCreate(indexMetaData) == false && shardRouting.primary();
final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(),
recoverFromLocalShards ? RecoveryState.Type.LOCAL_SHARDS : RecoveryState.Type.STORE, localNode, localNode);
if (recoverFromLocalShards) {
final List<IndexShard> startedShards = new ArrayList<>();
final IndexService sourceIndexService = indicesService.indexService(mergeSourceIndex);
final int numShards = sourceIndexService != null ? sourceIndexService.getIndexSettings().getNumberOfShards() : -1;
if (sourceIndexService != null) {
for (IndexShard shard : sourceIndexService) {
if (shard.state() == IndexShardState.STARTED) {
startedShards.add(shard);
}
}
}
if (numShards == startedShards.size()) {
markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (recoverFromLocalShards(mappingUpdateConsumer, startedShards)) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Throwable t) {
recoveryListener.onRecoveryFailure(recoveryState,
new RecoveryFailedException(shardId, localNode, localNode, t), true);
}
});
} else {
final Throwable t;
if (numShards == -1) {
t = new IndexNotFoundException(mergeSourceIndex);
} else {
t = new IllegalStateException("not all shards from index " + mergeSourceIndex
+ " are started yet, expected " + numShards + " found " + startedShards.size() + " can't recover shard "
+ shardId());
}
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, localNode, localNode, t), true);
}
} else {
markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (recoverFromStore()) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Throwable t) {
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, t), true);
}
});
}
} else {
// recover from a restore
final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(),
@ -1378,7 +1443,7 @@ public class IndexShard extends AbstractIndexShardComponent {
threadPool.generic().execute(() -> {
try {
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
if (restoreFromRepository(indexShardRepository, localNode)) {
if (restoreFromRepository(indexShardRepository)) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Throwable first) {
@ -1461,7 +1526,8 @@ public class IndexShard extends AbstractIndexShardComponent {
return mapperService.documentMapperWithAutoCreate(type);
}
private final EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, TranslogConfig translogConfig, QueryCachingPolicy cachingPolicy, TranslogRecoveryPerformer translogRecoveryPerformer) {
private final EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger);
return new EngineConfig(openMode, shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
@ -1627,5 +1693,4 @@ public class IndexShard extends AbstractIndexShardComponent {
IndexShard.this.delete(engine, engineDelete);
}
}
}

View File

@ -0,0 +1,138 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.NoLockFactory;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
final class LocalShardSnapshot implements Closeable {
private final IndexShard shard;
private final Store store;
private final IndexCommit indexCommit;
private final AtomicBoolean closed = new AtomicBoolean(false);
public LocalShardSnapshot(IndexShard shard) {
this.shard = shard;
store = shard.store();
store.incRef();
boolean success = false;
try {
indexCommit = shard.snapshotIndex(true);
success = true;
} finally {
if (success == false) {
store.decRef();
}
}
}
Index getIndex() {
return shard.indexSettings().getIndex();
}
Directory getSnapshotDirectory() {
/* this directory will not be used for anything else but reading / copying files to another directory
* we prevent all write operations on this directory with UOE - nobody should close it either. */
return new FilterDirectory(store.directory()) {
@Override
public String[] listAll() throws IOException {
Collection<String> fileNames = indexCommit.getFileNames();
final String[] fileNameArray = fileNames.toArray(new String[fileNames.size()]);
return fileNameArray;
}
@Override
public void deleteFile(String name) throws IOException {
throw new UnsupportedOperationException("this directory is read-only");
}
@Override
public void sync(Collection<String> names) throws IOException {
throw new UnsupportedOperationException("this directory is read-only");
}
@Override
public void renameFile(String source, String dest) throws IOException {
throw new UnsupportedOperationException("this directory is read-only");
}
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
throw new UnsupportedOperationException("this directory is read-only");
}
@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
throw new UnsupportedOperationException("this directory is read-only");
}
@Override
public Lock obtainLock(String name) throws IOException {
/* we do explicitly a no-lock instance since we hold an index commit from a SnapshotDeletionPolicy so we
* can we certain that nobody messes with the files on disk. We also hold a ref on the store which means
* no external source will delete files either.*/
return NoLockFactory.INSTANCE.obtainLock(in, name);
}
@Override
public void close() throws IOException {
throw new UnsupportedOperationException("nobody should close this directory wrapper");
}
};
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
shard.releaseSnapshot(indexCommit);
} finally {
store.decRef();
}
}
}
ImmutableOpenMap<String, MappingMetaData> getMappings() {
return shard.indexSettings.getIndexMetaData().getMappings();
}
@Override
public String toString() {
return "local_shard_snapshot:[" + shard.shardId() + " indexCommit: " + indexCommit + "]";
}
}

View File

@ -19,20 +19,27 @@
package org.elasticsearch.index.shard;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
@ -40,6 +47,11 @@ import org.elasticsearch.indices.recovery.RecoveryState;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@ -63,12 +75,11 @@ final class StoreRecovery {
* files / transaction logs. This
* @param indexShard the index shard instance to recovery the shard into
* @param indexShouldExists <code>true</code> iff the index should exist on disk ie. has the shard been allocated previously on the shards store.
* @param localNode the reference to the local node
* @return <code>true</code> if the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
* @see Store
*/
boolean recoverFromStore(final IndexShard indexShard, final boolean indexShouldExists, DiscoveryNode localNode) {
boolean recoverFromStore(final IndexShard indexShard, final boolean indexShouldExists) {
if (canRecover(indexShard)) {
if (indexShard.routingEntry().restoreSource() != null) {
throw new IllegalStateException("can't recover - restore source is not null");
@ -81,6 +92,141 @@ final class StoreRecovery {
return false;
}
boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, final IndexShard indexShard, final List<LocalShardSnapshot> shards) throws IOException {
if (canRecover(indexShard)) {
assert indexShard.recoveryState().getType() == RecoveryState.Type.LOCAL_SHARDS : "invalid recovery type: " + indexShard.recoveryState().getType();
if (indexShard.routingEntry().restoreSource() != null) {
throw new IllegalStateException("can't recover - restore source is not null");
}
if (shards.isEmpty()) {
throw new IllegalArgumentException("shards must not be empty");
}
Set<Index> indices = shards.stream().map((s) -> s.getIndex()).collect(Collectors.toSet());
if (indices.size() > 1) {
throw new IllegalArgumentException("can't add shards from more than one index");
}
for (ObjectObjectCursor<String, MappingMetaData> mapping : shards.get(0).getMappings()) {
mappingUpdateConsumer.accept(mapping.key, mapping.value);
}
for (ObjectObjectCursor<String, MappingMetaData> mapping : shards.get(0).getMappings()) {
indexShard.mapperService().merge(mapping.key,mapping.value.source(), MapperService.MergeReason.MAPPING_RECOVERY, true);
}
return executeRecovery(indexShard, () -> {
logger.debug("starting recovery from local shards {}", shards);
try {
final Directory directory = indexShard.store().directory(); // don't close this directory!!
addIndices(indexShard.recoveryState().getIndex(), directory, shards.stream().map(s -> s.getSnapshotDirectory())
.collect(Collectors.toList()).toArray(new Directory[shards.size()]));
internalRecoverFromStore(indexShard, true);
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
indexShard.getEngine().forceMerge(false, -1, false, false, false);
} catch (IOException ex) {
throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex);
}
});
}
return false;
}
final void addIndices(RecoveryState.Index indexRecoveryStats, Directory target, Directory... sources) throws IOException {
/*
* TODO: once we upgraded to Lucene 6.1 use HardlinkCopyDirectoryWrapper to enable hardlinks if possible and enable it
* in the security.policy:
*
* grant codeBase "${codebase.lucene-misc-6.1.0.jar}" {
* // needed to allow shard shrinking to use hard-links if possible via lucenes HardlinkCopyDirectoryWrapper
* permission java.nio.file.LinkPermission "hard";
* };
* target = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
*/
try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(target, indexRecoveryStats),
new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.CREATE))) {
writer.addIndexes(sources);
writer.commit();
}
}
/**
* Directory wrapper that records copy process for recovery statistics
*/
static final class StatsDirectoryWrapper extends FilterDirectory {
private final RecoveryState.Index index;
StatsDirectoryWrapper(Directory in, RecoveryState.Index indexRecoveryStats) {
super(in);
this.index = indexRecoveryStats;
}
@Override
public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException {
final long l = from.fileLength(src);
final AtomicBoolean copies = new AtomicBoolean(false);
// here we wrap the index input form the source directory to report progress of file copy for the recovery stats.
// we increment the num bytes recovered in the readBytes method below, if users pull statistics they can see immediately
// how much has been recovered.
in.copyFrom(new FilterDirectory(from) {
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
index.addFileDetail(dest, l, false);
copies.set(true);
final IndexInput input = in.openInput(name, context);
return new IndexInput("StatsDirectoryWrapper(" + input.toString() + ")") {
@Override
public void close() throws IOException {
input.close();
}
@Override
public long getFilePointer() {
throw new UnsupportedOperationException("only straight copies are supported");
}
@Override
public void seek(long pos) throws IOException {
throw new UnsupportedOperationException("seeks are not supported");
}
@Override
public long length() {
return input.length();
}
@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
throw new UnsupportedOperationException("slices are not supported");
}
@Override
public byte readByte() throws IOException {
throw new UnsupportedOperationException("use a buffer if you wanna perform well");
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
// we rely on the fact that copyFrom uses a buffer
input.readBytes(b, offset, len);
index.addRecoveredBytesToFile(dest, len);
}
};
}
}, src, dest, context);
if (copies.get() == false) {
index.addFileDetail(dest, l, true); // hardlinked - we treat it as reused since the file was already somewhat there
} else {
assert index.getFileDetails(dest) != null : "File [" + dest + "] has no file details";
assert index.getFileDetails(dest).recovered() == l : index.getFileDetails(dest).toString();
}
}
}
/**
* Recovers an index from a given {@link IndexShardRepository}. This method restores a
* previously created index snapshot into an existing initializing shard.
@ -89,7 +235,7 @@ final class StoreRecovery {
* @return <code>true</code> if the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
*/
boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository, DiscoveryNode localNode) {
boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository) {
if (canRecover(indexShard)) {
final ShardRouting shardRouting = indexShard.routingEntry();
if (shardRouting.restoreSource() == null) {
@ -200,29 +346,28 @@ final class StoreRecovery {
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
// its a "new index create" API, we have to do something, so better to clean it than use same data
logger.trace("cleaning existing shard, shouldn't exists");
IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE));
writer.close();
Lucene.cleanLuceneIndex(store.directory());
}
}
} catch (Throwable e) {
throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e);
}
recoveryState.getIndex().updateVersion(version);
// since we recover from local, just fill the files and size
try {
final RecoveryState.Index index = recoveryState.getIndex();
if (si != null) {
final Directory directory = store.directory();
for (String name : Lucene.files(si)) {
long length = directory.fileLength(name);
index.addFileDetail(name, length, true);
}
if (si != null && recoveryState.getType() == RecoveryState.Type.STORE) {
addRecoveredFileDetails(si, store, index);
}
} catch (IOException e) {
logger.debug("failed to list file details", e);
}
indexShard.performTranslogRecovery(indexShouldExists);
if (recoveryState.getType() == RecoveryState.Type.LOCAL_SHARDS) {
assert indexShouldExists;
indexShard.skipTranslogRecovery();
} else {
indexShard.performTranslogRecovery(indexShouldExists);
}
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
} catch (EngineException | IOException e) {
@ -232,6 +377,14 @@ final class StoreRecovery {
}
}
private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException {
final Directory directory = store.directory();
for (String name : Lucene.files(si)) {
long length = directory.fileLength(name);
index.addFileDetail(name, length, true);
}
}
/**
* Restores shard from {@link RestoreSource} associated with this shard in routing table
*/
@ -260,4 +413,5 @@ final class StoreRecovery {
throw new IndexShardRestoreFailedException(shardId, "restore failed", t);
}
}
}

View File

@ -682,7 +682,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
return refCounter.refCount();
}
private static final class StoreDirectory extends FilterDirectory {
static final class StoreDirectory extends FilterDirectory {
private final ESLogger deletesLogger;
@ -715,7 +715,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
public String toString() {
return "store(" + in.toString() + ")";
}
}
/**

View File

@ -20,6 +20,7 @@
package org.elasticsearch.indices.cluster;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
@ -68,6 +69,8 @@ import org.elasticsearch.search.SearchService;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
@ -473,7 +476,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
DiscoveryNodes nodes = event.state().nodes();
for (final ShardRouting shardRouting : routingNode) {
final IndexService indexService = indicesService.indexService(shardRouting.index());
if (indexService == null) {
@ -611,7 +613,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
indexShard.startRecovery(nodes.getLocalNode(), sourceNode, recoveryTargetService,
new RecoveryListener(shardRouting, indexService), repositoriesService);
new RecoveryListener(shardRouting, indexService), repositoriesService, (type, mapping) -> {
try {
nodeServicesProvider.getClient().admin().indices().preparePutMapping()
.setConcreteIndex(indexService.index()) // concrete index - no name clash, it uses uuid
.setType(type)
.setSource(mapping.source().string())
.get();
} catch (IOException ex) {
throw new ElasticsearchException("failed to stringify mapping source", ex);
}
}, indicesService);
}
/**
@ -667,7 +679,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
case STORE: return "after recovery from store";
case PRIMARY_RELOCATION: return "after recovery (primary relocation) from node [" + state.getSourceNode() + "]";
case REPLICA: return "after recovery (replica) from node [" + state.getSourceNode() + "]";
default: throw new IllegalArgumentException(state.getType().name());
case LOCAL_SHARDS: return "after recovery from local shards";
default: throw new IllegalArgumentException("Unknown recovery type: " + state.getType().name());
}
}

View File

@ -96,11 +96,12 @@ public class RecoveryState implements ToXContent, Streamable {
}
}
public static enum Type {
public enum Type {
STORE((byte) 0),
SNAPSHOT((byte) 1),
REPLICA((byte) 2),
PRIMARY_RELOCATION((byte) 3);
PRIMARY_RELOCATION((byte) 3),
LOCAL_SHARDS((byte) 4);
private static final Type[] TYPES = new Type[Type.values().length];
@ -978,5 +979,9 @@ public class RecoveryState implements ToXContent, Streamable {
return "{ \"error\" : \"" + e.getMessage() + "\"}";
}
}
public File getFileDetails(String dest) {
return fileDetails.get(dest);
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.shrink.ShrinkRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.AcknowledgedRestListener;
/**
*
*/
public class RestShrinkIndexAction extends BaseRestHandler {
@Inject
public RestShrinkIndexAction(Settings settings, RestController controller, Client client) {
super(settings, client);
controller.registerHandler(RestRequest.Method.PUT, "/{index}/_shrink/{target}", this);
controller.registerHandler(RestRequest.Method.POST, "/{index}/_shrink/{target}", this);
}
@SuppressWarnings({"unchecked"})
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
if (request.param("target") == null) {
throw new IllegalArgumentException("no target index");
}
if (request.param("index") == null) {
throw new IllegalArgumentException("no source index");
}
ShrinkRequest shrinkIndexRequest = new ShrinkRequest(request.param("target"), request.param("index"));
if (request.hasContent()) {
shrinkIndexRequest.getShrinkIndexReqeust().source(request.content());
}
shrinkIndexRequest.timeout(request.paramAsTime("timeout", shrinkIndexRequest.timeout()));
shrinkIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", shrinkIndexRequest.masterNodeTimeout()));
client.admin().indices().shrinkIndex(shrinkIndexRequest, new AcknowledgedRestListener<>(channel));
}
}

View File

@ -112,7 +112,7 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet;
* method.
* <p>
* Individual shards are getting restored as part of normal recovery process in
* {@link IndexShard#restoreFromRepository(IndexShardRepository, DiscoveryNode)} )}
* {@link IndexShard#restoreFromRepository(IndexShardRepository)} )}
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
* at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property.
* <p>

View File

@ -28,23 +28,31 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.junit.Ignore;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -277,4 +285,98 @@ public class CreateIndexIT extends ESIntegTestCase {
internalCluster().fullRestart();
ensureGreen("test");
}
public void testCreateShrinkIndex() {
internalCluster().ensureAtLeastNumDataNodes(2);
prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", randomIntBetween(2, 7))).get();
for (int i = 0; i < 20; i++) {
client().prepareIndex("source", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
}
ImmutableOpenMap<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes()
.getDataNodes();
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
String mergeNode = discoveryNodes[0].getName();
// relocate all shards to one node such that we can merge it.
client().admin().indices().prepareUpdateSettings("source")
.setSettings(Settings.builder()
.put("index.routing.allocation.require._name", mergeNode)
.put("index.blocks.write", true)).get();
ensureGreen();
// now merge source into a single shard index
assertAcked(client().admin().indices().prepareShrinkIndex("source", "target")
.setSettings(Settings.builder().put("index.number_of_replicas", 0).build()).get());
ensureGreen();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
// let it be allocated anywhere and bump replicas
client().admin().indices().prepareUpdateSettings("target")
.setSettings(Settings.builder()
.putNull("index.routing.allocation.include._id")
.put("index.number_of_replicas", 1)).get();
ensureGreen();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
for (int i = 20; i < 40; i++) {
client().prepareIndex("target", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
}
flushAndRefresh();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 40);
assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
}
/**
* Tests that we can manually recover from a failed allocation due to shards being moved away etc.
*/
public void testCreateShrinkIndexFails() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
prepareCreate("source").setSettings(Settings.builder().put(indexSettings())
.put("number_of_shards", randomIntBetween(2, 7))
.put("number_of_replicas", 0)).get();
for (int i = 0; i < 20; i++) {
client().prepareIndex("source", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
}
ImmutableOpenMap<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes()
.getDataNodes();
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
String spareNode = discoveryNodes[0].getName();
String mergeNode = discoveryNodes[1].getName();
// relocate all shards to one node such that we can merge it.
client().admin().indices().prepareUpdateSettings("source")
.setSettings(Settings.builder().put("index.routing.allocation.require._name", mergeNode)
.put("index.blocks.write", true)).get();
ensureGreen();
// now merge source into a single shard index
client().admin().indices().prepareShrinkIndex("source", "target")
.setSettings(Settings.builder()
.put("index.routing.allocation.exclude._name", mergeNode) // we manually exclude the merge node to forcefully fuck it up
.put("index.number_of_replicas", 0)
.put("index.allocation.max_retries", 1).build()).get();
// now we move all shards away from the merge node
client().admin().indices().prepareUpdateSettings("source")
.setSettings(Settings.builder().put("index.routing.allocation.require._name", spareNode)
.put("index.blocks.write", true)).get();
ensureGreen("source");
client().admin().indices().prepareUpdateSettings("target") // erase the forcefully fuckup!
.setSettings(Settings.builder().putNull("index.routing.allocation.exclude._name")).get();
// wait until it fails
assertBusy(() -> {
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
RoutingTable routingTables = clusterStateResponse.getState().routingTable();
assertTrue(routingTables.index("target").shard(0).getShards().get(0).unassigned());
assertEquals(UnassignedInfo.Reason.ALLOCATION_FAILED,
routingTables.index("target").shard(0).getShards().get(0).unassignedInfo().getReason());
assertEquals(1,
routingTables.index("target").shard(0).getShards().get(0).unassignedInfo().getNumFailedAllocations());
});
client().admin().indices().prepareUpdateSettings("source") // now relocate them all to the right node
.setSettings(Settings.builder()
.put("index.routing.allocation.require._name", mergeNode)).get();
ensureGreen("source");
client().admin().cluster().prepareReroute().setRetryFailed(true).get(); // kick off a retry and wait until it's done!
ensureGreen();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.shrink;
import org.apache.lucene.index.IndexWriter;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import static java.util.Collections.emptyMap;
public class TransportShrinkActionTests extends ESTestCase {
private ClusterState createClusterState(String name, int numShards, int numReplicas, Settings settings) {
MetaData.Builder metaBuilder = MetaData.builder();
IndexMetaData indexMetaData = IndexMetaData.builder(name).settings(settings(Version.CURRENT)
.put(settings))
.numberOfShards(numShards).numberOfReplicas(numReplicas).build();
metaBuilder.put(indexMetaData, false);
MetaData metaData = metaBuilder.build();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
routingTableBuilder.addAsNew(metaData.index(name));
RoutingTable routingTable = routingTableBuilder.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaData).routingTable(routingTable).blocks(ClusterBlocks.builder().addBlocks(indexMetaData)).build();
return clusterState;
}
public void testErrorCondition() {
ClusterState state = createClusterState("source", randomIntBetween(2, 100), randomIntBetween(0, 10),
Settings.builder().put("index.blocks.write", true).build());
DocsStats stats = new DocsStats(randomIntBetween(0, IndexWriter.MAX_DOCS-1), randomIntBetween(1, 1000));
assertEquals("Can't merge index with more than [2147483519] docs - too many documents",
expectThrows(IllegalStateException.class, () ->
TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), state,
new DocsStats(Integer.MAX_VALUE, randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY))
).getMessage());
// create one that won't fail
ClusterState clusterState = ClusterState.builder(createClusterState("source", randomIntBetween(2, 10), 0,
Settings.builder().put("index.blocks.write", true).build())).nodes(DiscoveryNodes.builder().put(newNode("node1")))
.build();
AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// now we start the shard
routingTable = service.applyStartedShards(clusterState,
routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), clusterState, stats,
new IndexNameExpressionResolver(Settings.EMPTY));
}
public void testShrinkIndexSettings() {
String indexName = randomAsciiOfLength(10);
// create one that won't fail
ClusterState clusterState = ClusterState.builder(createClusterState(indexName, randomIntBetween(2, 10), 0,
Settings.builder()
.put("index.blocks.write", true)
.build())).nodes(DiscoveryNodes.builder().put(newNode("node1")))
.build();
AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// now we start the shard
routingTable = service.applyStartedShards(clusterState,
routingTable.index(indexName).shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
DocsStats stats = new DocsStats(randomIntBetween(0, IndexWriter.MAX_DOCS-1), randomIntBetween(1, 1000));
ShrinkRequest target = new ShrinkRequest("target", indexName);
CreateIndexClusterStateUpdateRequest request = TransportShrinkAction.prepareCreateIndexRequest(
target, clusterState, stats, new IndexNameExpressionResolver(Settings.EMPTY));
assertNotNull(request.shrinkFrom());
assertEquals(indexName, request.shrinkFrom().getName());
assertEquals("1", request.settings().get("index.number_of_shards"));
assertEquals("shrink_index", request.cause());
}
private DiscoveryNode newNode(String nodeId) {
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, emptyMap(),
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA))), Version.CURRENT);
}
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import static java.util.Collections.emptyMap;
public class MetaDataCreateIndexServiceTests extends ESTestCase {
private ClusterState createClusterState(String name, int numShards, int numReplicas, Settings settings) {
MetaData.Builder metaBuilder = MetaData.builder();
IndexMetaData indexMetaData = IndexMetaData.builder(name).settings(settings(Version.CURRENT)
.put(settings))
.numberOfShards(numShards).numberOfReplicas(numReplicas).build();
metaBuilder.put(indexMetaData, false);
MetaData metaData = metaBuilder.build();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
routingTableBuilder.addAsNew(metaData.index(name));
RoutingTable routingTable = routingTableBuilder.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaData).routingTable(routingTable).blocks(ClusterBlocks.builder().addBlocks(indexMetaData)).build();
return clusterState;
}
public void testValidateShrinkIndex() {
ClusterState state = createClusterState("source", randomIntBetween(2, 100), randomIntBetween(0, 10),
Settings.builder().put("index.blocks.write", true).build());
assertEquals("index [source] already exists",
expectThrows(IndexAlreadyExistsException.class, () ->
MetaDataCreateIndexService.validateShrinkIndex(state, "target", Collections.emptySet(), "source", Settings.EMPTY)
).getMessage());
assertEquals("no such index",
expectThrows(IndexNotFoundException.class, () ->
MetaDataCreateIndexService.validateShrinkIndex(state, "no such index", Collections.emptySet(), "target", Settings.EMPTY)
).getMessage());
assertEquals("can't shrink an index with only one shard",
expectThrows(IllegalArgumentException.class, () ->
MetaDataCreateIndexService.validateShrinkIndex(createClusterState("source", 1, 0,
Settings.builder().put("index.blocks.write", true).build()), "source", Collections.emptySet(),
"target", Settings.EMPTY)
).getMessage());
assertEquals("index source must be read-only to shrink index. use \"index.blocks.write=true\"",
expectThrows(IllegalStateException.class, () ->
MetaDataCreateIndexService.validateShrinkIndex(
createClusterState("source", randomIntBetween(2, 100), randomIntBetween(0, 10), Settings.EMPTY)
, "source", Collections.emptySet(), "target", Settings.EMPTY)
).getMessage());
assertEquals("index source must have all shards allocated on the same node to shrink index",
expectThrows(IllegalStateException.class, () ->
MetaDataCreateIndexService.validateShrinkIndex(state, "source", Collections.emptySet(), "target", Settings.EMPTY)
).getMessage());
assertEquals("can not shrink index into more than one shard",
expectThrows(IllegalArgumentException.class, () ->
MetaDataCreateIndexService.validateShrinkIndex(state, "source", Collections.emptySet(), "target",
Settings.builder().put("index.number_of_shards", 2).build())
).getMessage());
assertEquals("mappings are not allowed when shrinking indices, all mappings are copied from the source index",
expectThrows(IllegalArgumentException.class, () -> {
MetaDataCreateIndexService.validateShrinkIndex(state, "source", Collections.singleton("foo"),
"target", Settings.EMPTY);
}
).getMessage());
// create one that won't fail
ClusterState clusterState = ClusterState.builder(createClusterState("source", randomIntBetween(2, 10), 0,
Settings.builder().put("index.blocks.write", true).build())).nodes(DiscoveryNodes.builder().put(newNode("node1")))
.build();
AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// now we start the shard
routingTable = service.applyStartedShards(clusterState,
routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
MetaDataCreateIndexService.validateShrinkIndex(clusterState, "source", Collections.emptySet(), "target", Settings.EMPTY);
}
public void testShrinkIndexSettings() {
String indexName = randomAsciiOfLength(10);
// create one that won't fail
ClusterState clusterState = ClusterState.builder(createClusterState(indexName, randomIntBetween(2, 10), 0,
Settings.builder()
.put("index.blocks.write", true)
.put("index.similarity.default.type", "BM25")
.put("index.analysis.analyzer.my_analyzer.tokenizer", "keyword")
.build())).nodes(DiscoveryNodes.builder().put(newNode("node1")))
.build();
AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// now we start the shard
routingTable = service.applyStartedShards(clusterState,
routingTable.index(indexName).shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
Settings.Builder builder = Settings.builder();
MetaDataCreateIndexService.prepareShrinkIndexSettings(
clusterState, Collections.emptySet(), builder, clusterState.metaData().index(indexName).getIndex(), "target");
assertEquals("1", builder.build().get("index.number_of_shards"));
assertEquals("similarity settings must be copied", "BM25", builder.build().get("index.similarity.default.type"));
assertEquals("analysis settings must be copied",
"keyword", builder.build().get("index.analysis.analyzer.my_analyzer.tokenizer"));
assertEquals("node1", builder.build().get("index.routing.allocation.include._id"));
assertEquals("1", builder.build().get("index.allocation.max_retries"));
}
private DiscoveryNode newNode(String nodeId) {
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, emptyMap(),
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA))), Version.CURRENT);
}
}

View File

@ -52,6 +52,12 @@ public class ShardRoutingHelper {
return routing.reinitializeShard().updateUnassignedInfo(new UnassignedInfo(reason, "test_reinit"));
}
public static ShardRouting initWithSameId(ShardRouting copy) {
return new ShardRouting(copy.shardId(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(),
copy.primary(), ShardRoutingState.INITIALIZING, new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, null),
copy.allocationId(), copy.getExpectedShardSize());
}
public static ShardRouting moveToUnassigned(ShardRouting routing, UnassignedInfo info) {
return routing.moveToUnassigned(info);
}

View File

@ -31,9 +31,11 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
@ -46,6 +48,7 @@ import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
@ -58,6 +61,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
@ -118,6 +122,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@ -975,7 +980,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
assertTrue(newShard.recoverFromStore(localNode));
assertTrue(newShard.recoverFromStore());
assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
@ -1003,7 +1008,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode,
localNode));
assertTrue(newShard.recoverFromStore(localNode));
assertTrue(newShard.recoverFromStore());
assertEquals(0, newShard.recoveryState().getTranslog().recoveredOperations());
assertEquals(0, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart());
@ -1037,7 +1042,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
newShard.updateRoutingEntry(routing, false);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
try {
newShard.recoverFromStore(localNode);
newShard.recoverFromStore();
fail("index not there!");
} catch (IndexShardRecoveryException ex) {
assertTrue(ex.getMessage().contains("failed to fetch index version after copying it over"));
@ -1056,7 +1061,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
newShard = test.createShard(routing);
newShard.updateRoutingEntry(routing, false);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(localNode));
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore());
newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted(), true);
SearchResponse response = client().prepareSearch().get();
@ -1142,7 +1147,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
@Override
public void verify(String verificationToken) {
}
}, localNode));
}));
test_target_shard.updateRoutingEntry(routing.moveToStarted(), true);
assertHitCount(client().prepareSearch("test_target").get(), 1);
@ -1398,7 +1403,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
public static final IndexShard recoverShard(IndexShard newShard) throws IOException {
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), newShard.routingEntry().primary(), RecoveryState.Type.STORE, localNode, localNode));
assertTrue(newShard.recoverFromStore(localNode));
assertTrue(newShard.recoverFromStore());
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted(), true);
return newShard;
}
@ -1507,4 +1512,83 @@ public class IndexShardTests extends ESSingleNodeTestCase {
// Shard should now be active since we did recover:
assertTrue(newShard.isActive());
}
public void testRecoverFromLocalShard() throws IOException {
createIndex("index");
createIndex("index_1");
createIndex("index_2");
client().admin().indices().preparePutMapping("index").setType("test").setSource(jsonBuilder().startObject()
.startObject("test")
.startObject("properties")
.startObject("foo")
.field("type", "text")
.endObject()
.endObject().endObject().endObject()).get();
client().prepareIndex("index", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
client().prepareIndex("index", "test", "1").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("index_1"));
IndexShard shard = test.getShardOrNull(0);
ShardRouting routing = ShardRoutingHelper.initWithSameId(shard.routingEntry());
test.removeShard(0, "b/c simon says so");
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
{
final IndexShard newShard = test.createShard(routing);
newShard.updateRoutingEntry(routing, false);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.LOCAL_SHARDS, localNode, localNode));
BiConsumer<String, MappingMetaData> mappingConsumer = (type, mapping) -> {
try {
client().admin().indices().preparePutMapping().setConcreteIndex(newShard.indexSettings().getIndex())
.setType(type)
.setSource(mapping.source().string())
.get();
} catch (IOException ex) {
throw new ElasticsearchException("failed to stringify mapping source", ex);
}
};
expectThrows(IllegalArgumentException.class, () -> {
IndexService index = indicesService.indexService(resolveIndex("index"));
IndexService index_2 = indicesService.indexService(resolveIndex("index_2"));
newShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(index.getShard(0), index_2.getShard(0)));
});
IndexService indexService = indicesService.indexService(resolveIndex("index"));
assertTrue(newShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(indexService.getShard(0))));
RecoveryState recoveryState = newShard.recoveryState();
assertEquals(RecoveryState.Stage.DONE, recoveryState.getStage());
assertTrue(recoveryState.getIndex().fileDetails().size() > 0);
for (RecoveryState.File file : recoveryState.getIndex().fileDetails()) {
if (file.reused()) {
assertEquals(file.recovered(), 0);
} else {
assertEquals(file.recovered(), file.length());
}
}
routing = ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
assertHitCount(client().prepareSearch("index_1").get(), 2);
}
// now check that it's persistent ie. that the added shards are committed
{
routing = shard.routingEntry();
test.removeShard(0, "b/c simon says so");
routing = ShardRoutingHelper.reinit(routing);
final IndexShard newShard = test.createShard(routing);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.LOCAL_SHARDS, localNode, localNode));
assertTrue(newShard.recoverFromStore());
routing = ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
assertHitCount(client().prepareSearch("index_1").get(), 2);
}
GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings("index_1").get();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = mappingsResponse.getMappings();
assertNotNull(mappings.get("index_1"));
assertNotNull(mappings.get("index_1").get("test"));
assertEquals(mappings.get("index_1").get("test").get().source().string(), "{\"test\":{\"properties\":{\"foo\":{\"type\":\"text\"}}}}");
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.Version;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.security.AccessControlException;
import java.util.Arrays;
import java.util.function.Predicate;
public class StoreRecoveryTests extends ESTestCase {
public void testAddIndices() throws IOException {
Directory[] dirs = new Directory[randomIntBetween(1, 10)];
final int numDocs = randomIntBetween(50, 100);
int id = 0;
for (int i = 0; i < dirs.length; i++) {
dirs[i] = newFSDirectory(createTempDir());
IndexWriter writer = new IndexWriter(dirs[i], newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.CREATE));
for (int j = 0; j < numDocs; j++) {
writer.addDocument(Arrays.asList(new StringField("id", Integer.toString(id++), Field.Store.YES)));
}
writer.commit();
writer.close();
}
StoreRecovery storeRecovery = new StoreRecovery(new ShardId("foo", "bar", 1), logger);
RecoveryState.Index indexStats = new RecoveryState.Index();
Directory target = newFSDirectory(createTempDir());
storeRecovery.addIndices(indexStats, target, dirs);
int numFiles = 0;
Predicate<String> filesFilter = (f) -> f.startsWith("segments") == false && f.equals("write.lock") == false
&& f.startsWith("extra") == false;
for (Directory d : dirs) {
numFiles += Arrays.asList(d.listAll()).stream().filter(filesFilter).count();
}
final long targetNumFiles = Arrays.asList(target.listAll()).stream().filter(filesFilter).count();
assertEquals(numFiles, targetNumFiles);
assertEquals(indexStats.totalFileCount(), targetNumFiles);
if (hardLinksSupported(createTempDir())) {
assertEquals("upgrade to HardlinkCopyDirectoryWrapper in Lucene 6.1", Version.LATEST, Version.LUCENE_6_0_0);
// assertEquals(indexStats.reusedFileCount(), targetNumFiles); -- uncomment this once upgraded to Lucene 6.1
assertEquals(indexStats.reusedFileCount(), 0);
} else {
assertEquals(indexStats.reusedFileCount(), 0);
}
DirectoryReader reader = DirectoryReader.open(target);
SegmentInfos segmentCommitInfos = SegmentInfos.readLatestCommit(target);
for (SegmentCommitInfo info : segmentCommitInfos) { // check that we didn't merge
assertEquals("all sources must be flush", info.info.getDiagnostics().get("source"), "flush");
}
assertEquals(reader.numDeletedDocs(), 0);
assertEquals(reader.numDocs(), id);
reader.close();
target.close();
IOUtils.close(dirs);
}
public void testStatsDirWrapper() throws IOException {
Directory dir = newDirectory();
Directory target = newDirectory();
RecoveryState.Index indexStats = new RecoveryState.Index();
StoreRecovery.StatsDirectoryWrapper wrapper = new StoreRecovery.StatsDirectoryWrapper(target, indexStats);
try (IndexOutput output = dir.createOutput("foo.bar", IOContext.DEFAULT)) {
CodecUtil.writeHeader(output, "foo", 0);
int numBytes = randomIntBetween(100, 20000);
for (int i = 0; i < numBytes; i++) {
output.writeByte((byte)i);
}
CodecUtil.writeFooter(output);
}
wrapper.copyFrom(dir, "foo.bar", "bar.foo", IOContext.DEFAULT);
assertNotNull(indexStats.getFileDetails("bar.foo"));
assertNull(indexStats.getFileDetails("foo.bar"));
assertEquals(dir.fileLength("foo.bar"), indexStats.getFileDetails("bar.foo").length());
assertEquals(dir.fileLength("foo.bar"), indexStats.getFileDetails("bar.foo").recovered());
assertFalse(indexStats.getFileDetails("bar.foo").reused());
IOUtils.close(dir, target);
}
public boolean hardLinksSupported(Path path) throws IOException {
try {
Files.createFile(path.resolve("foo.bar"));
Files.createLink(path.resolve("test"), path.resolve("foo.bar"));
BasicFileAttributes destAttr = Files.readAttributes(path.resolve("test"), BasicFileAttributes.class);
BasicFileAttributes sourceAttr = Files.readAttributes(path.resolve("foo.bar"), BasicFileAttributes.class);
// we won't get here - no permission ;)
return destAttr.fileKey() != null
&& destAttr.fileKey().equals(sourceAttr.fileKey());
} catch (AccessControlException ex) {
return true; // if we run into that situation we know it's supported.
} catch (UnsupportedOperationException ex) {
return false;
}
}
}

View File

@ -95,9 +95,7 @@ import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class StoreTests extends ESTestCase {

View File

@ -453,7 +453,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
assertEquals(1, imc.availableShards().size());
assertTrue(newShard.recoverFromStore(localNode));
assertTrue(newShard.recoverFromStore());
assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1);
newShard.updateRoutingEntry(routing.moveToStarted(), true);
} finally {

View File

@ -106,7 +106,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas
final DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE,
emptyMap(), emptySet(), Version.CURRENT);
shard.markAsRecovering("store", new RecoveryState(shard.shardId(), newRouting.primary(), RecoveryState.Type.SNAPSHOT, newRouting.restoreSource(), localNode));
shard.recoverFromStore(localNode);
shard.recoverFromStore();
newRouting = ShardRoutingHelper.moveToStarted(newRouting);
shard.updateRoutingEntry(newRouting, true);
} finally {

View File

@ -15,6 +15,7 @@ index settings, aliases, mappings, and index templates.
* <<indices-get-index>>
* <<indices-exists>>
* <<indices-open-close>>
* <<indices-shrink-index>>
[float]
[[mapping-management]]

View File

@ -0,0 +1,83 @@
[[indices-shrink-index]]
== Shrink Index
The shrink index API allows to shrink an existing index into a new index with a single shard.
In order to shrink an index, all its shards must be allocated on a single node in the cluster.
This is required since the shrink command will copy all shards index files into the target index
data folder when the primary of the target index is initially allocated.
When an index is shrunk no write operations should happen to the source index. Elasticsearch will
enforce the `read-only` property when the shrink command is executed. All operations necessary to shrink the
source index are executed during initial primary recovery. Once the target index primary shard is started the
shrink operation has successfully finished. To monitor status and progress use <<cat-recovery>>
To shrink and index all shards of that index must be allocated on a single node.
[source,js]
--------------------------------------------------
$ curl -XPUT 'http://localhost:9200/logs/_settings' -d '{
"settings" : {
"index.routing.allocation.require._name" : "shrink_node_name", <1>
"index.blocks.write" : true <2>
}
}'
--------------------------------------------------
<1> Forces the relocation of all of the indices shards to the node `shrink_node_name`
<2> Prevents write operations to this index while still allowing metadata changes like deleting the index.
The above second curl example shows how an index called `logs` can be
forced to allocate at least one copy of each shard on a specific node in the cluster.
The `_shrink` API is similar to <<indices-create-index>> and accepts `settings` and `aliases` for the target index.
[source,js]
--------------------------------------------------
$ curl -XPUT 'http://localhost:9200/logs/_shrink/logs_single_shard' -d '{
"settings" : {
"index.codec" : "best_compression", <1>
"index.number_of_replicas" : 0 <2>
}
}'
--------------------------------------------------
<1> Enables `best_compression` codec on the target index
<2> Sets the number of replicas on the target index to `0` to ensure the cluster is green once the shard initialized
The API call above returns immediately once the target index is created but doesn't wait
for the shrink operation to start. Once the target indices primary shard moves to state `initializing`
the shrink operation has started.
Once the index is shrunk replicas can be set to `1` and allocation filtering can be removed.
[source,js]
--------------------------------------------------
$ curl -XPUT 'http://localhost:9200/logs_single_shard/_settings' -d '{
"settings" : {
"index.routing.allocation.include._id" : null, <1>
"index.number_of_replicas" : 1 <2>
}
}'
--------------------------------------------------
<1> Resets the allocation filtering for the new shrunk index to allow replica allocation
<2> Bumps the number of replicas to 1
[float]
[[shrink-index-limitations]]
=== Limitations
Indices can only be shrunk into a single shard if they fully the following requirements:
* an instance of all of the indices shards must be allocated on a single node
* the index must not contain more than `2.14 billion` documents (`2147483519`) in total (sum of all shards)
This is the maximum shard size elasticsearch can support.
* the index must have more than one shard
* the index must be `read-only`, ie. have a cluster block set `index.blocks.write=true`
* the target index must not exist
* all `index.analysis.*` and `index.similarity.*` settings passed to the `_shrink` call will be overwritten with the
source indices settings.
* if the target index can't be allocated on the shrink node, due to throttling or other allocation deciders,
its primary shard will stay `unassigned` until it can be allocated on that node

View File

@ -23,9 +23,8 @@ import java.io.IOException;
import java.nio.file.Path;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MMapDirectory;
import org.elasticsearch.index.store.SmbDirectoryWrapper;
public class SmbMMapDirectoryTests extends ESBaseDirectoryTestCase {
public class SmbMMapDirectoryTests extends EsBaseDirectoryTestCase {
@Override
protected Directory getDirectory(Path file) throws IOException {

View File

@ -23,9 +23,8 @@ import java.io.IOException;
import java.nio.file.Path;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.index.store.SmbDirectoryWrapper;
public class SmbSimpleFSDirectoryTests extends ESBaseDirectoryTestCase {
public class SmbSimpleFSDirectoryTests extends EsBaseDirectoryTestCase {
@Override
protected Directory getDirectory(Path file) throws IOException {

View File

@ -0,0 +1,35 @@
{
"indices.shrink": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/indices-shrink-index.html",
"methods": ["PUT", "POST"],
"url": {
"path": "/{index}/_shrink/{target}",
"paths": ["/{index}/_shrink/{target}"],
"parts": {
"index": {
"type" : "string",
"required" : true,
"description" : "The name of the source index to shrink"
},
"target": {
"type" : "string",
"required" : true,
"description" : "The name of the target index to shrink into"
}
},
"params": {
"timeout": {
"type" : "time",
"description" : "Explicit operation timeout"
},
"master_timeout": {
"type" : "time",
"description" : "Specify timeout for connection to master"
}
}
},
"body": {
"description" : "The configuration for the target index (`settings` and `aliases`)"
}
}
}

View File

@ -0,0 +1,72 @@
---
"Shrink index via API":
# creates an index with one document.
# relocates all it's shards to one node
# shrinks it into a new index with a single shard
- do:
indices.create:
index: source
body:
settings:
number_of_replicas: "0"
- do:
index:
index: source
type: test
id: "1"
body: { "foo": "hello world" }
- do:
get:
index: source
type: test
id: "1"
- match: { _index: source }
- match: { _type: test }
- match: { _id: "1" }
- match: { _source: { foo: "hello world" } }
- do:
cluster.state: {}
# Get master node id
- set: { master_node: master }
# relocate everything to the master node and make it read-only
- do:
indices.put_settings:
index: source
body:
index.routing.allocation.include._id: $master
index.blocks.write: true
index.number_of_replicas: 0
- do:
cluster.health:
wait_for_status: green
index: source
wait_for_relocating_shards: 0
# now we do the actual shrink
- do:
indices.shrink:
index: "source"
target: "target"
body:
index.number_of_replicas: 0
- do:
cluster.health:
wait_for_status: green
- do:
get:
index: target
type: test
id: "1"
- match: { _index: target }
- match: { _type: test }
- match: { _id: "1" }
- match: { _source: { foo: "hello world" } }

View File

@ -25,6 +25,8 @@ import org.apache.lucene.store.BaseDirectoryTestCase;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TimeUnits;
import org.elasticsearch.bootstrap.BootstrapForTesting;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter;
/**
@ -36,8 +38,11 @@ import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter;
@TimeoutSuite(millis = TimeUnits.HOUR)
@LuceneTestCase.SuppressReproduceLine
@LuceneTestCase.SuppressSysoutChecks(bugUrl = "we log a lot on purpose")
public abstract class ESBaseDirectoryTestCase extends BaseDirectoryTestCase {
public abstract class EsBaseDirectoryTestCase extends BaseDirectoryTestCase {
static {
BootstrapForTesting.ensureInitialized();
}
protected final ESLogger logger = Loggers.getLogger(getClass());
}