Add a `_freeze` / `_unfreeze` API (#35592)
This commit adds a rest endpoint for freezing and unfreezing an index. Among other cleanups mainly fixing an issue accessing package private APIs from a plugin that got caught by integration tests this change also adds documentation for frozen indices. Note: frozen indices are marked as `beta` and available as a basic feature. Relates to #34352
This commit is contained in:
parent
a989b675b5
commit
29ef442841
|
@ -0,0 +1,56 @@
|
|||
[role="xpack"]
|
||||
[testenv="basic"]
|
||||
[[frozen-indices]]
|
||||
= Frozen Indices
|
||||
|
||||
[partintro]
|
||||
--
|
||||
Elasticsearch indices can require a significant amount of memory available in order to be open and searchable. Yet, not all indices need
|
||||
to be writable at the same time and have different access patterns over time. For example, indices in the time series or logging use cases
|
||||
are unlikely to be queried once they age out but still need to be kept around for retention policy purposes.
|
||||
|
||||
In order to keep indices available and queryable for a longer period but at the same time reduce their hardware requirements they can be transitioned
|
||||
into a frozen state. Once an index is frozen, all of its transient shard memory (aside from mappings and analyzers)
|
||||
is moved to persistent storage. This allows for a much higher disk to heap storage ratio on individual nodes. Once an index is
|
||||
frozen, it is made read-only and drops its transient data structures from memory. These data structures will need to be reloaded on demand (and subsequently dropped) for each search request that targets the frozen index. A search request that hits
|
||||
one or more frozen shards will be executed on a throttled threadpool that ensures that we never search more than
|
||||
`N` (`1` by default) searches concurrently (see <<search-throttled>>). This protects nodes from exceeding the available memory due to incoming search requests.
|
||||
|
||||
In contrast to ordinary open indices, frozen indices are expected to execute slowly and are not designed for high query load. Parallelism is
|
||||
gained only on a per-node level and loading data-structures on demand is expected to be one or more orders of a magnitude slower than query
|
||||
execution on a per shard level. Depending on the data in an index, a frozen index may execute searches in the seconds to minutes range, when the same index in an unfrozen state may execute the same search request in milliseconds.
|
||||
--
|
||||
|
||||
== Best Practices
|
||||
|
||||
Since frozen indices provide a much higher disk to heap ratio at the expense of search latency, it is advisable to allocate frozen indices to
|
||||
dedicated nodes to prevent searches on frozen indices influencing traffic on low latency nodes. There is significant overhead in loading
|
||||
data structures on demand which can cause page faults and garbage collections, which further slow down query execution.
|
||||
|
||||
Since indices that are eligible for freezing are unlikely to change in the future, disk space can be optimized as described in <<tune-for-disk-usage>>.
|
||||
|
||||
== Searching a frozen index
|
||||
|
||||
Frozen indices are throttled in order to limit memory consumptions per node. The number of concurrently loaded frozen indices per node is
|
||||
limited by the number of threads in the <<search-throttled>> threadpool, which is `1` by default.
|
||||
Search requests will not be executed against frozen indices by default, even if a frozen index is named explicitly. This is
|
||||
to prevent accidental slowdowns by targeting a frozen index by mistake. To include frozen indices a search request must be executed with
|
||||
the query parameter `ignore_throttled=false`.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
GET /twitter/_search?q=user:kimchy&ignore_throttled=false
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[setup:twitter]
|
||||
|
||||
[IMPORTANT]
|
||||
================================
|
||||
While frozen indices are slow to search, they can be pre-filtered efficiently. The request parameter `pre_filter_shard_size` specifies
|
||||
a threshold that, when exceeded, will enforce a round-trip to pre-filter search shards that cannot possibly match.
|
||||
This filter phase can limit the number of shards significantly. For instance, if a date range filter is applied, then all indices (frozen or unfrozen) that do not contain documents within the date range can be skipped efficiently.
|
||||
The default value for `pre_filter_shard_size` is `128` but it's recommended to set it to `1` when searching frozen indices. There is no
|
||||
significant overhead associated with this pre-filter phase.
|
||||
================================
|
||||
|
||||
|
|
@ -63,6 +63,8 @@ include::monitoring/index.asciidoc[]
|
|||
|
||||
include::rollup/index.asciidoc[]
|
||||
|
||||
include::frozen-indices.asciidoc[]
|
||||
|
||||
include::rest-api/index.asciidoc[]
|
||||
|
||||
include::commands/index.asciidoc[]
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
[role="xpack"]
|
||||
[testenv="basic"]
|
||||
[[freeze-index-api]]
|
||||
== Freeze Index API
|
||||
++++
|
||||
<titleabbrev>Freeze Index</titleabbrev>
|
||||
++++
|
||||
|
||||
Freezes an index.
|
||||
|
||||
[float]
|
||||
=== Request
|
||||
|
||||
`POST /<index>/_freeze`
|
||||
|
||||
[float]
|
||||
=== Description
|
||||
|
||||
A frozen index has almost no overhead on the cluster (except
|
||||
for maintaining its metadata in memory), and is blocked for write operations.
|
||||
See <<frozen-indices>> and <<unfreeze-index-api>>.
|
||||
|
||||
[float]
|
||||
=== Path Parameters
|
||||
|
||||
`index` (required)::
|
||||
(string) Identifier for the index
|
||||
|
||||
//=== Query Parameters
|
||||
|
||||
//=== Authorization
|
||||
|
||||
[float]
|
||||
=== Examples
|
||||
|
||||
The following example freezes and unfreezes an index:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
POST /my_index/_freeze
|
||||
POST /my_index/_unfreeze
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[s/^/PUT my_index\n/]
|
||||
|
||||
[IMPORTANT]
|
||||
================================
|
||||
Freezing an index will close the index and reopen it within the same API call. This causes primaries to not be allocated for a short
|
||||
amount of time and causes the cluster to go red until the primaries are allocated again. This limitation might be removed in the future.
|
||||
================================
|
|
@ -0,0 +1,50 @@
|
|||
[role="xpack"]
|
||||
[testenv="basic"]
|
||||
[[unfreeze-index-api]]
|
||||
== Unfreeze Index API
|
||||
++++
|
||||
<titleabbrev>Unfreeze Index</titleabbrev>
|
||||
++++
|
||||
|
||||
Unfreezes an index.
|
||||
|
||||
[float]
|
||||
=== Request
|
||||
|
||||
`POST /<index>/_unfreeze`
|
||||
|
||||
[float]
|
||||
=== Description
|
||||
|
||||
When a frozen index is unfrozen, the index goes through the normal recovery
|
||||
process and becomes writeable again. See <<frozen-indices>> and <<freeze-index-api>>.
|
||||
|
||||
[float]
|
||||
=== Path Parameters
|
||||
|
||||
`index` (required)::
|
||||
(string) Identifier for the index
|
||||
|
||||
|
||||
//=== Query Parameters
|
||||
|
||||
//=== Authorization
|
||||
|
||||
[float]
|
||||
=== Examples
|
||||
|
||||
The following example freezes and unfreezes an index:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
POST /my_index/_freeze
|
||||
POST /my_index/_unfreeze
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[s/^/PUT my_index\n/]
|
||||
|
||||
[IMPORTANT]
|
||||
================================
|
||||
Freezing an index will close the index and reopen it within the same API call. This causes primaries to not be allocated for a short
|
||||
amount of time and causes the cluster to go red until the primaries are allocated again. This limitation might be removed in the future.
|
||||
================================
|
|
@ -19,6 +19,10 @@ There are several thread pools, but the important ones include:
|
|||
`int((# of available_processors * 3) / 2) + 1`, and initial queue_size of
|
||||
`1000`.
|
||||
|
||||
[[search-throttled]]`search_throttled`::
|
||||
For count/search/suggest/get operations on `search_throttled indices`. Thread pool type is
|
||||
`fixed_auto_queue_size` with a size of `1`, and initial queue_size of `100`.
|
||||
|
||||
`get`::
|
||||
For get operations. Thread pool type is `fixed`
|
||||
with a size of `# of available processors`,
|
||||
|
|
|
@ -10,6 +10,7 @@ directly to configure and access {xpack} features.
|
|||
* <<info-api,Info API>>
|
||||
* <<ccr-apis,Cross-cluster replication APIs>>
|
||||
* <<graph-explore-api,Graph Explore API>>
|
||||
* <<freeze-index-api>>, <<unfreeze-index-api>>
|
||||
* <<index-lifecycle-management-api,Index lifecycle management APIs>>
|
||||
* <<licensing-apis,Licensing APIs>>
|
||||
* <<ml-apis,Machine Learning APIs>>
|
||||
|
@ -23,11 +24,13 @@ directly to configure and access {xpack} features.
|
|||
include::info.asciidoc[]
|
||||
include::{es-repo-dir}/ccr/apis/ccr-apis.asciidoc[]
|
||||
include::{es-repo-dir}/graph/explore.asciidoc[]
|
||||
include::{es-repo-dir}/indices/apis/freeze.asciidoc[]
|
||||
include::{es-repo-dir}/ilm/apis/ilm-api.asciidoc[]
|
||||
include::{es-repo-dir}/licensing/index.asciidoc[]
|
||||
include::{es-repo-dir}/migration/migration.asciidoc[]
|
||||
include::{es-repo-dir}/ml/apis/ml-api.asciidoc[]
|
||||
include::{es-repo-dir}/rollup/rollup-api.asciidoc[]
|
||||
include::{xes-repo-dir}/rest-api/security.asciidoc[]
|
||||
include::{es-repo-dir}/indices/apis/unfreeze.asciidoc[]
|
||||
include::{xes-repo-dir}/rest-api/watcher.asciidoc[]
|
||||
include::defs.asciidoc[]
|
||||
|
|
|
@ -109,7 +109,7 @@ public class TransportCloseIndexAction extends TransportMasterNodeAction<CloseIn
|
|||
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
|
||||
.indices(concreteIndices);
|
||||
|
||||
indexStateService.closeIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
|
||||
indexStateService.closeIndices(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(ClusterStateUpdateResponse response) {
|
||||
|
|
|
@ -28,7 +28,7 @@ public class OpenIndexClusterStateUpdateRequest extends IndicesClusterStateUpdat
|
|||
|
||||
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
|
||||
|
||||
OpenIndexClusterStateUpdateRequest() {
|
||||
public OpenIndexClusterStateUpdateRequest() {
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -40,10 +40,10 @@ public class OpenIndexResponse extends ShardsAcknowledgedResponse {
|
|||
declareAcknowledgedAndShardsAcknowledgedFields(PARSER);
|
||||
}
|
||||
|
||||
OpenIndexResponse() {
|
||||
public OpenIndexResponse() {
|
||||
}
|
||||
|
||||
OpenIndexResponse(boolean acknowledged, boolean shardsAcknowledged) {
|
||||
public OpenIndexResponse(boolean acknowledged, boolean shardsAcknowledged) {
|
||||
super(acknowledged, shardsAcknowledged);
|
||||
}
|
||||
|
||||
|
|
|
@ -84,7 +84,7 @@ public class MetaDataIndexStateService {
|
|||
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
|
||||
}
|
||||
|
||||
public void closeIndex(final CloseIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
|
||||
public void closeIndices(final CloseIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
|
||||
if (request.indices() == null || request.indices().length == 0) {
|
||||
throw new IllegalArgumentException("Index name is required");
|
||||
}
|
||||
|
@ -99,8 +99,14 @@ public class MetaDataIndexStateService {
|
|||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return closeIndices(currentState, request.indices(), indicesAsString);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public ClusterState closeIndices(ClusterState currentState, final Index[] indices, String indicesAsString) {
|
||||
Set<IndexMetaData> indicesToClose = new HashSet<>();
|
||||
for (Index index : request.indices()) {
|
||||
for (Index index : indices) {
|
||||
final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index);
|
||||
if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
|
||||
indicesToClose.add(indexMetaData);
|
||||
|
@ -138,8 +144,6 @@ public class MetaDataIndexStateService {
|
|||
ClusterState.builder(updatedState).routingTable(rtBuilder.build()).build(),
|
||||
"indices closed [" + indicesAsString + "]");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void openIndex(final OpenIndexClusterStateUpdateRequest request,
|
||||
final ActionListener<OpenIndexClusterStateUpdateResponse> listener) {
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.elasticsearch.index.engine;
|
|||
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.SegmentCommitInfo;
|
||||
import org.apache.lucene.index.SegmentInfos;
|
||||
|
@ -66,7 +67,7 @@ public class ReadOnlyEngine extends Engine {
|
|||
private final IndexCommit indexCommit;
|
||||
private final Lock indexWriterLock;
|
||||
private final DocsStats docsStats;
|
||||
protected final RamAccountingSearcherFactory searcherFactory;
|
||||
private final RamAccountingSearcherFactory searcherFactory;
|
||||
|
||||
/**
|
||||
* Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened
|
||||
|
@ -414,4 +415,8 @@ public class ReadOnlyEngine extends Engine {
|
|||
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
|
||||
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo());
|
||||
}
|
||||
|
||||
protected void processReaders(IndexReader reader, IndexReader previousReader) {
|
||||
searcherFactory.processReaders(reader, previousReader);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -389,6 +389,18 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
* @param listeners new listerns to use for the newly created shard
|
||||
*/
|
||||
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException {
|
||||
return reinitShard(current, routing, current.engineFactory, listeners);
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes an existing shard, closes it and starts a new initialing shard at the same location
|
||||
*
|
||||
* @param routing the shard routing to use for the newly created shard.
|
||||
* @param listeners new listerns to use for the newly created shard
|
||||
* @param engineFactory the engine factory for the new shard
|
||||
*/
|
||||
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, EngineFactory engineFactory,
|
||||
IndexingOperationListener... listeners) throws IOException {
|
||||
closeShards(current);
|
||||
return newShard(
|
||||
routing,
|
||||
|
@ -396,7 +408,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
current.indexSettings().getIndexMetaData(),
|
||||
null,
|
||||
null,
|
||||
current.engineFactory,
|
||||
engineFactory,
|
||||
current.getGlobalCheckpointSyncer(),
|
||||
EMPTY_EVENT_LISTENER, listeners);
|
||||
}
|
||||
|
|
|
@ -158,7 +158,7 @@ public final class FrozenEngine extends ReadOnlyEngine {
|
|||
listeners.beforeRefresh();
|
||||
}
|
||||
reader = DirectoryReader.open(engineConfig.getStore().directory());
|
||||
searcherFactory.processReaders(reader, null);
|
||||
processReaders(reader, null);
|
||||
reader = lastOpenedReader = wrapReader(reader, Function.identity());
|
||||
reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed);
|
||||
for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) {
|
||||
|
|
|
@ -6,13 +6,15 @@
|
|||
package org.elasticsearch.xpack.core;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.settings.SecureString;
|
||||
import org.elasticsearch.license.LicensingClient;
|
||||
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
|
||||
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
|
||||
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction;
|
||||
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction.FreezeIndexAction;
|
||||
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction.FreezeRequest;
|
||||
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction.FreezeResponse;
|
||||
import org.elasticsearch.xpack.core.action.XPackInfoAction;
|
||||
import org.elasticsearch.xpack.core.action.XPackInfoRequestBuilder;
|
||||
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
|
||||
|
@ -25,6 +27,7 @@ import org.elasticsearch.xpack.core.watcher.client.WatcherClient;
|
|||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.BASIC_AUTH_HEADER;
|
||||
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
|
||||
|
@ -106,7 +109,20 @@ public class XPackClient {
|
|||
client.execute(XPackInfoAction.INSTANCE, request, listener);
|
||||
}
|
||||
|
||||
public void freeze(TransportFreezeIndexAction.FreezeRequest request, ActionListener<AcknowledgedResponse> listener) {
|
||||
client.execute(TransportFreezeIndexAction.FreezeIndexAction.INSTANCE, request, listener);
|
||||
/**
|
||||
* Freezes or unfreeze one or more indices
|
||||
*/
|
||||
public void freeze(FreezeRequest request, ActionListener<FreezeResponse> listener) {
|
||||
client.execute(FreezeIndexAction.INSTANCE, request, listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Freeze or unfreeze one or more indices
|
||||
*/
|
||||
public FreezeResponse freeze(FreezeRequest request)
|
||||
throws ExecutionException, InterruptedException {
|
||||
PlainActionFuture<FreezeResponse> future = new PlainActionFuture<>();
|
||||
freeze(request, future);
|
||||
return future.get();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.plugins.Plugin;
|
|||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction;
|
||||
import org.elasticsearch.xpack.core.action.XPackInfoAction;
|
||||
import org.elasticsearch.xpack.core.action.XPackUsageAction;
|
||||
import org.elasticsearch.xpack.core.beats.BeatsFeatureSetUsage;
|
||||
|
@ -344,7 +345,8 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||
ExplainLifecycleAction.INSTANCE,
|
||||
RemoveIndexLifecyclePolicyAction.INSTANCE,
|
||||
MoveToStepAction.INSTANCE,
|
||||
RetryAction.INSTANCE
|
||||
RetryAction.INSTANCE,
|
||||
TransportFreezeIndexAction.FreezeIndexAction.INSTANCE
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.elasticsearch.xpack.core.action.TransportXPackUsageAction;
|
|||
import org.elasticsearch.xpack.core.action.XPackInfoAction;
|
||||
import org.elasticsearch.xpack.core.action.XPackUsageAction;
|
||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.core.rest.action.RestFreezeIndexAction;
|
||||
import org.elasticsearch.xpack.core.rest.action.RestXPackInfoAction;
|
||||
import org.elasticsearch.xpack.core.rest.action.RestXPackUsageAction;
|
||||
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
|
||||
|
@ -297,6 +298,7 @@ public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, Exte
|
|||
List<RestHandler> handlers = new ArrayList<>();
|
||||
handlers.add(new RestXPackInfoAction(settings, restController));
|
||||
handlers.add(new RestXPackUsageAction(settings, restController));
|
||||
handlers.add(new RestFreezeIndexAction(settings, restController));
|
||||
handlers.addAll(licensing.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings, settingsFilter,
|
||||
indexNameExpressionResolver, nodesInCluster));
|
||||
return handlers;
|
||||
|
|
|
@ -10,7 +10,10 @@ import org.elasticsearch.action.Action;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
|
@ -18,12 +21,14 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ack.OpenIndexClusterStateUpdateResponse;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -39,22 +44,27 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
|
||||
public final class TransportFreezeIndexAction extends
|
||||
TransportMasterNodeAction<TransportFreezeIndexAction.FreezeRequest, AcknowledgedResponse> {
|
||||
TransportMasterNodeAction<TransportFreezeIndexAction.FreezeRequest, TransportFreezeIndexAction.FreezeResponse> {
|
||||
|
||||
private final DestructiveOperations destructiveOperations;
|
||||
private final MetaDataIndexStateService indexStateService;
|
||||
|
||||
@Inject
|
||||
public TransportFreezeIndexAction(TransportService transportService, ClusterService clusterService,
|
||||
public TransportFreezeIndexAction(MetaDataIndexStateService indexStateService, TransportService transportService,
|
||||
ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
DestructiveOperations destructiveOperations) {
|
||||
super(FreezeIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
|
||||
FreezeRequest::new);
|
||||
this.destructiveOperations = destructiveOperations;
|
||||
this.indexStateService = indexStateService;
|
||||
}
|
||||
@Override
|
||||
protected String executor() {
|
||||
|
@ -62,27 +72,78 @@ public final class TransportFreezeIndexAction extends
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, FreezeRequest request, ActionListener<AcknowledgedResponse> listener) {
|
||||
protected void doExecute(Task task, FreezeRequest request, ActionListener<FreezeResponse> listener) {
|
||||
destructiveOperations.failDestructive(request.indices());
|
||||
super.doExecute(task, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AcknowledgedResponse newResponse() {
|
||||
return new AcknowledgedResponse();
|
||||
protected FreezeResponse newResponse() {
|
||||
return new FreezeResponse();
|
||||
}
|
||||
|
||||
private Index[] resolveIndices(FreezeRequest request, ClusterState state) {
|
||||
List<Index> indices = new ArrayList<>();
|
||||
for (Index index : indexNameExpressionResolver.concreteIndices(state, request)) {
|
||||
IndexMetaData metaData = state.metaData().index(index);
|
||||
Settings settings = metaData.getSettings();
|
||||
// only unfreeze if we are frozen and only freeze if we are not frozen already.
|
||||
// this prevents all indices that are already frozen that match a pattern to
|
||||
// go through the cycles again.
|
||||
if ((request.freeze() && FrozenEngine.INDEX_FROZEN.get(settings) == false) ||
|
||||
(request.freeze() == false && FrozenEngine.INDEX_FROZEN.get(settings))) {
|
||||
indices.add(index);
|
||||
}
|
||||
}
|
||||
if (indices.isEmpty() && request.indicesOptions().allowNoIndices() == false) {
|
||||
throw new ResourceNotFoundException("no index found to " + (request.freeze() ? "freeze" : "unfreeze"));
|
||||
}
|
||||
return indices.toArray(Index.EMPTY_ARRAY);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(FreezeRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
|
||||
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
|
||||
if (concreteIndices == null || concreteIndices.length == 0) {
|
||||
throw new ResourceNotFoundException("index not found");
|
||||
protected void masterOperation(FreezeRequest request, ClusterState state, ActionListener<FreezeResponse> listener) {
|
||||
final Index[] concreteIndices = resolveIndices(request, state);
|
||||
if (concreteIndices.length == 0) {
|
||||
listener.onResponse(new FreezeResponse(true, true));
|
||||
return;
|
||||
}
|
||||
|
||||
clusterService.submitStateUpdateTask("toggle-frozen-settings",
|
||||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(Priority.URGENT, request, listener) {
|
||||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(Priority.URGENT, request, new ActionListener<AcknowledgedResponse>() {
|
||||
@Override
|
||||
public ClusterState execute(final ClusterState currentState) {
|
||||
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
|
||||
OpenIndexClusterStateUpdateRequest updateRequest = new OpenIndexClusterStateUpdateRequest()
|
||||
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
|
||||
.indices(concreteIndices).waitForActiveShards(request.waitForActiveShards());
|
||||
indexStateService.openIndex(updateRequest, new ActionListener<OpenIndexClusterStateUpdateResponse>() {
|
||||
@Override
|
||||
public void onResponse(OpenIndexClusterStateUpdateResponse openIndexClusterStateUpdateResponse) {
|
||||
listener.onResponse(new FreezeResponse(openIndexClusterStateUpdateResponse.isAcknowledged(),
|
||||
openIndexClusterStateUpdateResponse.isShardsAcknowledged()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
List<Index> toClose = new ArrayList<>();
|
||||
for (Index index : concreteIndices) {
|
||||
IndexMetaData metaData = currentState.metaData().index(index);
|
||||
if (metaData.getState() != IndexMetaData.State.CLOSE) {
|
||||
toClose.add(index);
|
||||
}
|
||||
}
|
||||
currentState = indexStateService.closeIndices(currentState, toClose.toArray(new Index[0]), toClose.toString());
|
||||
final MetaData.Builder builder = MetaData.builder(currentState.metaData());
|
||||
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
|
||||
for (Index index : concreteIndices) {
|
||||
|
@ -94,12 +155,13 @@ public final class TransportFreezeIndexAction extends
|
|||
final Settings.Builder settingsBuilder =
|
||||
Settings.builder()
|
||||
.put(currentState.metaData().index(index).getSettings())
|
||||
.put("index.blocks.write", request.freeze())
|
||||
.put(FrozenEngine.INDEX_FROZEN.getKey(), request.freeze())
|
||||
.put(IndexSettings.INDEX_SEARCH_THROTTLED.getKey(), request.freeze());
|
||||
if (request.freeze()) {
|
||||
settingsBuilder.put("index.blocks.write", true);
|
||||
blocks.addIndexBlock(index.getName(), IndexMetaData.INDEX_WRITE_BLOCK);
|
||||
} else {
|
||||
settingsBuilder.remove("index.blocks.write");
|
||||
blocks.removeIndexBlock(index.getName(), IndexMetaData.INDEX_WRITE_BLOCK);
|
||||
}
|
||||
imdBuilder.settings(settingsBuilder);
|
||||
|
@ -121,7 +183,17 @@ public final class TransportFreezeIndexAction extends
|
|||
indexNameExpressionResolver.concreteIndexNames(state, request));
|
||||
}
|
||||
|
||||
public static class FreezeIndexAction extends Action<AcknowledgedResponse> {
|
||||
public static class FreezeResponse extends OpenIndexResponse {
|
||||
public FreezeResponse() {
|
||||
super();
|
||||
}
|
||||
|
||||
public FreezeResponse(boolean acknowledged, boolean shardsAcknowledged) {
|
||||
super(acknowledged, shardsAcknowledged);
|
||||
}
|
||||
}
|
||||
|
||||
public static class FreezeIndexAction extends Action<FreezeResponse> {
|
||||
|
||||
public static final FreezeIndexAction INSTANCE = new FreezeIndexAction();
|
||||
public static final String NAME = "indices:admin/freeze";
|
||||
|
@ -131,8 +203,8 @@ public final class TransportFreezeIndexAction extends
|
|||
}
|
||||
|
||||
@Override
|
||||
public AcknowledgedResponse newResponse() {
|
||||
return new AcknowledgedResponse();
|
||||
public FreezeResponse newResponse() {
|
||||
return new FreezeResponse();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,7 +212,8 @@ public final class TransportFreezeIndexAction extends
|
|||
implements IndicesRequest.Replaceable {
|
||||
private String[] indices;
|
||||
private boolean freeze = true;
|
||||
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, false, true);
|
||||
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
|
||||
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
|
||||
|
||||
public FreezeRequest(String... indices) {
|
||||
this.indices = indices;
|
||||
|
@ -155,8 +228,9 @@ public final class TransportFreezeIndexAction extends
|
|||
return validationException;
|
||||
}
|
||||
|
||||
public void setFreeze(boolean freeze) {
|
||||
public FreezeRequest setFreeze(boolean freeze) {
|
||||
this.freeze = freeze;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean freeze() {
|
||||
|
@ -169,6 +243,7 @@ public final class TransportFreezeIndexAction extends
|
|||
indicesOptions = IndicesOptions.readIndicesOptions(in);
|
||||
indices = in.readStringArray();
|
||||
freeze = in.readBoolean();
|
||||
waitForActiveShards = ActiveShardCount.readFrom(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -177,6 +252,7 @@ public final class TransportFreezeIndexAction extends
|
|||
indicesOptions.writeIndicesOptions(out);
|
||||
out.writeStringArray(indices);
|
||||
out.writeBoolean(freeze);
|
||||
waitForActiveShards.writeTo(out);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -215,5 +291,28 @@ public final class TransportFreezeIndexAction extends
|
|||
this.indices = indices;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ActiveShardCount waitForActiveShards() {
|
||||
return waitForActiveShards;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the number of shard copies that should be active for indices opening to return.
|
||||
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
|
||||
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
|
||||
* wait for all shards (primary and all replicas) to be active before returning.
|
||||
* Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
|
||||
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
|
||||
* to wait for the desired amount of shard copies to become active before returning.
|
||||
* Indices opening will only wait up until the timeout value for the number of shard copies
|
||||
* to be active before returning. Check {@link OpenIndexResponse#isShardsAcknowledged()} to
|
||||
* determine if the requisite shard copies were all started before returning or timing out.
|
||||
*
|
||||
* @param waitForActiveShards number of active shard copies to wait on
|
||||
*/
|
||||
public FreezeRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
|
||||
this.waitForActiveShards = waitForActiveShards;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.rest.action;
|
||||
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.core.XPackClient;
|
||||
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction;
|
||||
import org.elasticsearch.xpack.core.rest.XPackRestHandler;
|
||||
|
||||
public final class RestFreezeIndexAction extends XPackRestHandler {
|
||||
public RestFreezeIndexAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.POST, "/{index}/_freeze", this);
|
||||
controller.registerHandler(RestRequest.Method.POST, "/{index}/_unfreeze", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer doPrepareRequest(RestRequest request, XPackClient client) {
|
||||
boolean freeze = request.path().endsWith("/_freeze");
|
||||
TransportFreezeIndexAction.FreezeRequest freezeRequest =
|
||||
new TransportFreezeIndexAction.FreezeRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
freezeRequest.timeout(request.paramAsTime("timeout", freezeRequest.timeout()));
|
||||
freezeRequest.masterNodeTimeout(request.paramAsTime("master_timeout", freezeRequest.masterNodeTimeout()));
|
||||
freezeRequest.indicesOptions(IndicesOptions.fromRequest(request, freezeRequest.indicesOptions()));
|
||||
String waitForActiveShards = request.param("wait_for_active_shards");
|
||||
if (waitForActiveShards != null) {
|
||||
freezeRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
|
||||
}
|
||||
freezeRequest.setFreeze(freeze);
|
||||
return channel -> client.freeze(freezeRequest, new RestToXContentListener<>(channel));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "freeze_index";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class FrozenIndexRecoveryTests extends IndexShardTestCase {
|
||||
|
||||
/**
|
||||
* Make sure we can recover from a frozen engine
|
||||
*/
|
||||
public void testRecoverFromFrozenPrimary() throws IOException {
|
||||
IndexShard indexShard = newStartedShard(true);
|
||||
indexDoc(indexShard, "_doc", "1");
|
||||
indexDoc(indexShard, "_doc", "2");
|
||||
indexDoc(indexShard, "_doc", "3");
|
||||
indexShard.close("test", true);
|
||||
final ShardRouting shardRouting = indexShard.routingEntry();
|
||||
IndexShard frozenShard = reinitShard(indexShard, ShardRoutingHelper.initWithSameId(shardRouting,
|
||||
shardRouting.primary() ? RecoverySource.ExistingStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE
|
||||
), FrozenEngine::new);
|
||||
recoverShardFromStore(frozenShard);
|
||||
assertThat(frozenShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(frozenShard.seqNoStats().getMaxSeqNo()));
|
||||
assertDocCount(frozenShard, 3);
|
||||
|
||||
IndexShard replica = newShard(false, Settings.EMPTY, FrozenEngine::new);
|
||||
recoverReplica(replica, frozenShard, true);
|
||||
assertDocCount(replica, 3);
|
||||
closeShards(frozenShard, replica);
|
||||
}
|
||||
}
|
|
@ -6,14 +6,13 @@
|
|||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -39,6 +38,7 @@ import org.hamcrest.Matchers;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
|
@ -58,14 +58,8 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
|||
client().prepareIndex("index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
client().prepareIndex("index", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
client().prepareIndex("index", "_doc", "3").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
|
||||
client().admin().indices().prepareFlush("index").get();
|
||||
client().admin().indices().prepareClose("index").get();
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
PlainActionFuture<AcknowledgedResponse> future = new PlainActionFuture<>();
|
||||
xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("index"), future);
|
||||
assertAcked(future.get());
|
||||
assertAcked(client().admin().indices().prepareOpen("index"));
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("index")));
|
||||
expectThrows(ClusterBlockException.class, () -> client().prepareIndex("index", "_doc", "4").setSource("field", "value")
|
||||
.setRefreshPolicy(IMMEDIATE).get());
|
||||
IndicesService indexServices = getInstanceFromNode(IndicesService.class);
|
||||
|
@ -101,7 +95,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
|||
} while (searchResponse.getHits().getHits().length > 0);
|
||||
}
|
||||
|
||||
public void testSearchAndGetAPIsAreThrottled() throws ExecutionException, InterruptedException, IOException {
|
||||
public void testSearchAndGetAPIsAreThrottled() throws InterruptedException, IOException, ExecutionException {
|
||||
XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject("_doc")
|
||||
.startObject("properties").startObject("field").field("type", "text").field("term_vector", "with_positions_offsets_payloads")
|
||||
.endObject().endObject()
|
||||
|
@ -110,15 +104,8 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
|||
for (int i = 0; i < 10; i++) {
|
||||
client().prepareIndex("index", "_doc", "" + i).setSource("field", "foo bar baz").get();
|
||||
}
|
||||
client().admin().indices().prepareFlush("index").get();
|
||||
client().admin().indices().prepareClose("index").get();
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
PlainActionFuture<AcknowledgedResponse> future = new PlainActionFuture<>();
|
||||
TransportFreezeIndexAction.FreezeRequest request =
|
||||
new TransportFreezeIndexAction.FreezeRequest("index");
|
||||
xPackClient.freeze(request, future);
|
||||
assertAcked(future.get());
|
||||
assertAcked(client().admin().indices().prepareOpen("index"));
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("index")));
|
||||
int numRequests = randomIntBetween(20, 50);
|
||||
CountDownLatch latch = new CountDownLatch(numRequests);
|
||||
ActionListener listener = ActionListener.wrap(latch::countDown);
|
||||
|
@ -152,21 +139,17 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
|||
assertEquals(numRefreshes, index.getTotal().refresh.getTotal());
|
||||
}
|
||||
|
||||
public void testFreezeAndUnfreeze() throws ExecutionException, InterruptedException {
|
||||
public void testFreezeAndUnfreeze() throws InterruptedException, ExecutionException {
|
||||
createIndex("index", Settings.builder().put("index.number_of_shards", 2).build());
|
||||
client().prepareIndex("index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
client().prepareIndex("index", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
client().prepareIndex("index", "_doc", "3").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
|
||||
client().admin().indices().prepareFlush("index").get();
|
||||
client().admin().indices().prepareClose("index").get();
|
||||
if (randomBoolean()) {
|
||||
// sometimes close it
|
||||
assertAcked(client().admin().indices().prepareClose("index").get());
|
||||
}
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
PlainActionFuture<AcknowledgedResponse> future = new PlainActionFuture<>();
|
||||
TransportFreezeIndexAction.FreezeRequest request =
|
||||
new TransportFreezeIndexAction.FreezeRequest("index");
|
||||
xPackClient.freeze(request, future);
|
||||
assertAcked(future.get());
|
||||
assertAcked(client().admin().indices().prepareOpen("index"));
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("index")));
|
||||
{
|
||||
IndicesService indexServices = getInstanceFromNode(IndicesService.class);
|
||||
Index index = resolveIndex("index");
|
||||
|
@ -175,12 +158,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
|||
IndexShard shard = indexService.getShard(0);
|
||||
assertEquals(0, shard.refreshStats().getTotal());
|
||||
}
|
||||
client().admin().indices().prepareClose("index").get();
|
||||
request.setFreeze(false);
|
||||
PlainActionFuture<AcknowledgedResponse> future1= new PlainActionFuture<>();
|
||||
xPackClient.freeze(request, future1);
|
||||
assertAcked(future1.get());
|
||||
assertAcked(client().admin().indices().prepareOpen("index"));
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("index").setFreeze(false)));
|
||||
{
|
||||
IndicesService indexServices = getInstanceFromNode(IndicesService.class);
|
||||
Index index = resolveIndex("index");
|
||||
|
@ -193,16 +171,63 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
|||
client().prepareIndex("index", "_doc", "4").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
}
|
||||
|
||||
public void testIndexMustBeClosed() {
|
||||
private void assertIndexFrozen(String idx) {
|
||||
IndicesService indexServices = getInstanceFromNode(IndicesService.class);
|
||||
Index index = resolveIndex(idx);
|
||||
IndexService indexService = indexServices.indexServiceSafe(index);
|
||||
assertTrue(indexService.getIndexSettings().isSearchThrottled());
|
||||
assertTrue(FrozenEngine.INDEX_FROZEN.get(indexService.getIndexSettings().getSettings()));
|
||||
}
|
||||
|
||||
public void testDoubleFreeze() throws ExecutionException, InterruptedException {
|
||||
createIndex("test-idx", Settings.builder().put("index.number_of_shards", 2).build());
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
PlainActionFuture<AcknowledgedResponse> future = new PlainActionFuture<>();
|
||||
TransportFreezeIndexAction.FreezeRequest request =
|
||||
new TransportFreezeIndexAction.FreezeRequest("test-idx");
|
||||
xPackClient.freeze(request, future);
|
||||
ExecutionException executionException = expectThrows(ExecutionException.class, () -> future.get());
|
||||
assertThat(executionException.getCause(), Matchers.instanceOf(IllegalStateException.class));
|
||||
assertEquals("index [test-idx] is not closed", executionException.getCause().getMessage());
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("test-idx")));
|
||||
ExecutionException executionException = expectThrows(ExecutionException.class,
|
||||
() -> xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("test-idx")
|
||||
.indicesOptions(new IndicesOptions(EnumSet.noneOf(IndicesOptions.Option.class),
|
||||
EnumSet.of(IndicesOptions.WildcardStates.OPEN)))));
|
||||
assertEquals("no index found to freeze", executionException.getCause().getMessage());
|
||||
}
|
||||
|
||||
public void testUnfreezeClosedIndices() throws ExecutionException, InterruptedException {
|
||||
createIndex("idx", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
client().prepareIndex("idx", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
createIndex("idx-closed", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
client().prepareIndex("idx-closed", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("idx")));
|
||||
assertAcked(client().admin().indices().prepareClose("idx-closed").get());
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("idx*").setFreeze(false)
|
||||
.indicesOptions(IndicesOptions.strictExpand())));
|
||||
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get();
|
||||
assertEquals(IndexMetaData.State.CLOSE, stateResponse.getState().getMetaData().index("idx-closed").getState());
|
||||
assertEquals(IndexMetaData.State.OPEN, stateResponse.getState().getMetaData().index("idx").getState());
|
||||
assertHitCount(client().prepareSearch().get(), 1L);
|
||||
}
|
||||
|
||||
public void testFreezePattern() throws ExecutionException, InterruptedException {
|
||||
createIndex("test-idx", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
client().prepareIndex("test-idx", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
createIndex("test-idx-1", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
client().prepareIndex("test-idx-1", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("test-idx")));
|
||||
assertIndexFrozen("test-idx");
|
||||
|
||||
IndicesStatsResponse index = client().admin().indices().prepareStats("test-idx").clear().setRefresh(true).get();
|
||||
assertEquals(0, index.getTotal().refresh.getTotal());
|
||||
assertHitCount(client().prepareSearch("test-idx").setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED).get(), 1);
|
||||
index = client().admin().indices().prepareStats("test-idx").clear().setRefresh(true).get();
|
||||
assertEquals(1, index.getTotal().refresh.getTotal());
|
||||
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("test*")));
|
||||
assertIndexFrozen("test-idx");
|
||||
assertIndexFrozen("test-idx-1");
|
||||
index = client().admin().indices().prepareStats("test-idx").clear().setRefresh(true).get();
|
||||
assertEquals(1, index.getTotal().refresh.getTotal());
|
||||
index = client().admin().indices().prepareStats("test-idx-1").clear().setRefresh(true).get();
|
||||
assertEquals(0, index.getTotal().refresh.getTotal());
|
||||
}
|
||||
|
||||
public void testCanMatch() throws ExecutionException, InterruptedException, IOException {
|
||||
|
@ -232,15 +257,9 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
|||
Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, true, null, null)));
|
||||
}
|
||||
|
||||
client().admin().indices().prepareFlush("index").get();
|
||||
client().admin().indices().prepareClose("index").get();
|
||||
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
PlainActionFuture<AcknowledgedResponse> future = new PlainActionFuture<>();
|
||||
TransportFreezeIndexAction.FreezeRequest request =
|
||||
new TransportFreezeIndexAction.FreezeRequest("index");
|
||||
xPackClient.freeze(request, future);
|
||||
assertAcked(future.get());
|
||||
assertAcked(client().admin().indices().prepareOpen("index").setWaitForActiveShards(ActiveShardCount.DEFAULT));
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("index")));
|
||||
{
|
||||
|
||||
IndicesService indexServices = getInstanceFromNode(IndicesService.class);
|
||||
|
@ -266,4 +285,43 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
|||
assertEquals(0, response.getTotal().refresh.getTotal()); // never opened a reader
|
||||
}
|
||||
}
|
||||
|
||||
public void testWriteToFrozenIndex() throws ExecutionException, InterruptedException {
|
||||
createIndex("idx", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
client().prepareIndex("idx", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("idx")));
|
||||
assertIndexFrozen("idx");
|
||||
expectThrows(ClusterBlockException.class, () ->
|
||||
client().prepareIndex("idx", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get());
|
||||
}
|
||||
|
||||
public void testIgnoreUnavailable() throws ExecutionException, InterruptedException {
|
||||
createIndex("idx", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
createIndex("idx-close", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
assertAcked(client().admin().indices().prepareClose("idx-close"));
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("idx*", "not_available")
|
||||
.indicesOptions(IndicesOptions.fromParameters(null, "true", null, null, IndicesOptions.strictExpandOpen()))));
|
||||
assertIndexFrozen("idx");
|
||||
assertEquals(IndexMetaData.State.CLOSE,
|
||||
client().admin().cluster().prepareState().get().getState().metaData().index("idx-close").getState());
|
||||
}
|
||||
|
||||
public void testUnfreezeClosedIndex() throws ExecutionException, InterruptedException {
|
||||
createIndex("idx", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("idx")));
|
||||
assertAcked(client().admin().indices().prepareClose("idx"));
|
||||
assertEquals(IndexMetaData.State.CLOSE,
|
||||
client().admin().cluster().prepareState().get().getState().metaData().index("idx").getState());
|
||||
expectThrows(ExecutionException.class,
|
||||
() -> xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("id*").setFreeze(false)
|
||||
.indicesOptions(new IndicesOptions(EnumSet.noneOf(IndicesOptions.Option.class),
|
||||
EnumSet.of(IndicesOptions.WildcardStates.OPEN)))));
|
||||
// we don't resolve to closed indices
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("idx").setFreeze(false)));
|
||||
assertEquals(IndexMetaData.State.OPEN,
|
||||
client().admin().cluster().prepareState().get().getState().metaData().index("idx").getState());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
{
|
||||
"indices.freeze": {
|
||||
"documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/frozen.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/{index}/_freeze",
|
||||
"paths": [
|
||||
"/{index}/_freeze"
|
||||
],
|
||||
"parts": {
|
||||
"index": {
|
||||
"type": "string",
|
||||
"required": true,
|
||||
"description": "The name of the index to freeze"
|
||||
}
|
||||
},
|
||||
"params": {
|
||||
"timeout": {
|
||||
"type" : "time",
|
||||
"description" : "Explicit operation timeout"
|
||||
},
|
||||
"master_timeout": {
|
||||
"type" : "time",
|
||||
"description" : "Specify timeout for connection to master"
|
||||
},
|
||||
"ignore_unavailable": {
|
||||
"type" : "boolean",
|
||||
"description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)"
|
||||
},
|
||||
"allow_no_indices": {
|
||||
"type" : "boolean",
|
||||
"description" : "Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)"
|
||||
},
|
||||
"expand_wildcards": {
|
||||
"type" : "enum",
|
||||
"options" : ["open","closed","none","all"],
|
||||
"default" : "closed",
|
||||
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
|
||||
},
|
||||
"wait_for_active_shards": {
|
||||
"type" : "string",
|
||||
"description" : "Sets the number of active shards to wait for before the operation returns."
|
||||
}
|
||||
}
|
||||
},
|
||||
"body": null
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
{
|
||||
"indices.unfreeze": {
|
||||
"documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/frozen.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/{index}/_unfreeze",
|
||||
"paths": [ "/{index}/_unfreeze" ],
|
||||
"parts": {
|
||||
"index": {
|
||||
"type": "string",
|
||||
"required": true,
|
||||
"description": "The name of the index to unfreeze"
|
||||
}
|
||||
},
|
||||
"params": {
|
||||
"timeout": {
|
||||
"type" : "time",
|
||||
"description" : "Explicit operation timeout"
|
||||
},
|
||||
"master_timeout": {
|
||||
"type" : "time",
|
||||
"description" : "Specify timeout for connection to master"
|
||||
},
|
||||
"ignore_unavailable": {
|
||||
"type" : "boolean",
|
||||
"description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)"
|
||||
},
|
||||
"allow_no_indices": {
|
||||
"type" : "boolean",
|
||||
"description" : "Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)"
|
||||
},
|
||||
"expand_wildcards": {
|
||||
"type" : "enum",
|
||||
"options" : ["open","closed","none","all"],
|
||||
"default" : "closed",
|
||||
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
|
||||
},
|
||||
"wait_for_active_shards": {
|
||||
"type" : "string",
|
||||
"description" : "Sets the number of active shards to wait for before the operation returns."
|
||||
}
|
||||
}
|
||||
},
|
||||
"body": null
|
||||
}
|
||||
}
|
|
@ -0,0 +1,128 @@
|
|||
---
|
||||
"Basic":
|
||||
|
||||
- skip:
|
||||
version: " - 6.99.99"
|
||||
reason: types are required in requests before 7.0.0
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
id: "1"
|
||||
body: { "foo": "Hello: 1" }
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
id: "2"
|
||||
body: { "foo": "Hello: 2" }
|
||||
|
||||
- do:
|
||||
indices.freeze:
|
||||
index: test
|
||||
|
||||
- do:
|
||||
search:
|
||||
index: test
|
||||
ignore_throttled: false
|
||||
body:
|
||||
query:
|
||||
match:
|
||||
foo: hello
|
||||
|
||||
- match: {hits.total: 2}
|
||||
|
||||
# unfreeze
|
||||
- do:
|
||||
indices.unfreeze:
|
||||
index: test
|
||||
|
||||
- do:
|
||||
search:
|
||||
index: _all
|
||||
body:
|
||||
query:
|
||||
match:
|
||||
foo: hello
|
||||
|
||||
- match: {hits.total: 2}
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test-01
|
||||
id: "1"
|
||||
body: { "foo": "Hello: 01" }
|
||||
|
||||
|
||||
- do:
|
||||
indices.freeze:
|
||||
index: test*
|
||||
|
||||
- do:
|
||||
search:
|
||||
index: _all
|
||||
ignore_throttled: false
|
||||
body:
|
||||
query:
|
||||
match:
|
||||
foo: hello
|
||||
|
||||
- match: {hits.total: 3}
|
||||
|
||||
- do:
|
||||
search:
|
||||
index: _all
|
||||
body:
|
||||
query:
|
||||
match:
|
||||
foo: hello
|
||||
|
||||
- match: {hits.total: 0}
|
||||
|
||||
---
|
||||
"Test index options":
|
||||
|
||||
- skip:
|
||||
version: " - 6.99.99"
|
||||
reason: types are required in requests before 7.0.0
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
id: "1"
|
||||
body: { "foo": "Hello: 1" }
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test-close
|
||||
id: "1"
|
||||
body: { "foo": "Hello: 1" }
|
||||
|
||||
- do:
|
||||
indices.close:
|
||||
index: test-close
|
||||
|
||||
- do:
|
||||
indices.freeze:
|
||||
index: test*,not_available
|
||||
ignore_unavailable: true
|
||||
|
||||
- do:
|
||||
search:
|
||||
index: _all
|
||||
body:
|
||||
query:
|
||||
match:
|
||||
foo: hello
|
||||
|
||||
- match: {hits.total: 0}
|
||||
|
||||
- do:
|
||||
search:
|
||||
index: _all
|
||||
ignore_throttled: false
|
||||
body:
|
||||
query:
|
||||
match:
|
||||
foo: hello
|
||||
|
||||
- match: {hits.total: 1}
|
Loading…
Reference in New Issue