Add a java level freeze/unfreeze API (#35353)
This change adds a high level freeze API that allows to mark an index as frozen and vice versa. Indices must be closed in order to become frozen and an open but frozen index must be closed to be defrosted. This change also adds a index.frozen setting to mark frozen indices and integrates the frozen engine with the SearchOperationListener that resets and releases the directory reader after and before search phases. Relates to #34352 Depends on #34357
This commit is contained in:
parent
ba478827ad
commit
64df803af0
|
@ -31,7 +31,11 @@ import org.apache.lucene.store.AlreadyClosedException;
|
|||
import org.apache.lucene.util.Bits;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.shard.SearchOperationListener;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
|
@ -59,6 +63,8 @@ import java.util.function.Function;
|
|||
* stats in order to obtain the number of reopens.
|
||||
*/
|
||||
public final class FrozenEngine extends ReadOnlyEngine {
|
||||
public static final Setting<Boolean> INDEX_FROZEN = Setting.boolSetting("index.frozen", false, Setting.Property.IndexScope,
|
||||
Setting.Property.PrivateIndex);
|
||||
private volatile DirectoryReader lastOpenedReader;
|
||||
|
||||
public FrozenEngine(EngineConfig config) {
|
||||
|
@ -232,6 +238,49 @@ public final class FrozenEngine extends ReadOnlyEngine {
|
|||
return null;
|
||||
}
|
||||
|
||||
/*
|
||||
* We register this listener for a frozen index that will
|
||||
* 1. reset the reader every time the search context is validated which happens when the context is looked up ie. on a fetch phase
|
||||
* etc.
|
||||
* 2. register a releasable resource that is cleaned after each phase that releases the reader for this searcher
|
||||
*/
|
||||
public static class ReacquireEngineSearcherListener implements SearchOperationListener {
|
||||
|
||||
@Override
|
||||
public void validateSearchContext(SearchContext context, TransportRequest transportRequest) {
|
||||
Searcher engineSearcher = context.searcher().getEngineSearcher();
|
||||
LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(engineSearcher.getDirectoryReader());
|
||||
if (lazyDirectoryReader != null) {
|
||||
try {
|
||||
lazyDirectoryReader.reset();
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
// also register a release resource in this case if we have multiple roundtrips like in DFS
|
||||
registerRelease(context, lazyDirectoryReader);
|
||||
}
|
||||
}
|
||||
|
||||
private void registerRelease(SearchContext context, LazyDirectoryReader lazyDirectoryReader) {
|
||||
context.addReleasable(() -> {
|
||||
try {
|
||||
lazyDirectoryReader.release();
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}, SearchContext.Lifetime.PHASE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNewContext(SearchContext context) {
|
||||
Searcher engineSearcher = context.searcher().getEngineSearcher();
|
||||
LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(engineSearcher.getDirectoryReader());
|
||||
if (lazyDirectoryReader != null) {
|
||||
registerRelease(context, lazyDirectoryReader);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This class allows us to use the same high level reader across multiple search phases but replace the underpinnings
|
||||
* on/after each search phase. This is really important otherwise we would hold on to multiple readers across phases.
|
||||
|
|
|
@ -6,11 +6,13 @@
|
|||
package org.elasticsearch.xpack.core;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.settings.SecureString;
|
||||
import org.elasticsearch.license.LicensingClient;
|
||||
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
|
||||
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
|
||||
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction;
|
||||
import org.elasticsearch.xpack.core.action.XPackInfoAction;
|
||||
import org.elasticsearch.xpack.core.action.XPackInfoRequestBuilder;
|
||||
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
|
||||
|
@ -103,4 +105,8 @@ public class XPackClient {
|
|||
public void info(XPackInfoRequest request, ActionListener<XPackInfoResponse> listener) {
|
||||
client.execute(XPackInfoAction.INSTANCE, request, listener);
|
||||
}
|
||||
|
||||
public void freeze(TransportFreezeIndexAction.FreezeRequest request, ActionListener<AcknowledgedResponse> listener) {
|
||||
client.execute(TransportFreezeIndexAction.FreezeIndexAction.INSTANCE, request, listener);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,8 +37,10 @@ import org.elasticsearch.common.settings.SettingsFilter;
|
|||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.EngineFactory;
|
||||
import org.elasticsearch.index.engine.FrozenEngine;
|
||||
import org.elasticsearch.license.LicenseService;
|
||||
import org.elasticsearch.license.LicensesMetaData;
|
||||
import org.elasticsearch.license.Licensing;
|
||||
|
@ -55,6 +57,7 @@ import org.elasticsearch.script.ScriptService;
|
|||
import org.elasticsearch.snapshots.SourceOnlySnapshotRepository;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction;
|
||||
import org.elasticsearch.xpack.core.action.TransportXPackInfoAction;
|
||||
import org.elasticsearch.xpack.core.action.TransportXPackUsageAction;
|
||||
import org.elasticsearch.xpack.core.action.XPackInfoAction;
|
||||
|
@ -266,6 +269,8 @@ public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, Exte
|
|||
List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>();
|
||||
actions.add(new ActionHandler<>(XPackInfoAction.INSTANCE, TransportXPackInfoAction.class));
|
||||
actions.add(new ActionHandler<>(XPackUsageAction.INSTANCE, TransportXPackUsageAction.class));
|
||||
actions.add(new ActionHandler<>(TransportFreezeIndexAction.FreezeIndexAction.INSTANCE,
|
||||
TransportFreezeIndexAction.class));
|
||||
actions.addAll(licensing.getActions());
|
||||
return actions;
|
||||
}
|
||||
|
@ -359,7 +364,10 @@ public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, Exte
|
|||
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
|
||||
if (indexSettings.getValue(SourceOnlySnapshotRepository.SOURCE_ONLY)) {
|
||||
return Optional.of(SourceOnlySnapshotRepository.getEngineFactory());
|
||||
} else if (indexSettings.getValue(FrozenEngine.INDEX_FROZEN)) {
|
||||
return Optional.of(FrozenEngine::new);
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
|
@ -367,6 +375,15 @@ public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, Exte
|
|||
public List<Setting<?>> getSettings() {
|
||||
List<Setting<?>> settings = super.getSettings();
|
||||
settings.add(SourceOnlySnapshotRepository.SOURCE_ONLY);
|
||||
settings.add(FrozenEngine.INDEX_FROZEN);
|
||||
return settings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onIndexModule(IndexModule indexModule) {
|
||||
if (FrozenEngine.INDEX_FROZEN.get(indexModule.getSettings())) {
|
||||
indexModule.addSearchOperationListener(new FrozenEngine.ReacquireEngineSearcherListener());
|
||||
}
|
||||
super.onIndexModule(indexModule);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,219 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.action;
|
||||
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.CollectionUtils;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.FrozenEngine;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
|
||||
public final class TransportFreezeIndexAction extends
|
||||
TransportMasterNodeAction<TransportFreezeIndexAction.FreezeRequest, AcknowledgedResponse> {
|
||||
|
||||
private final DestructiveOperations destructiveOperations;
|
||||
|
||||
@Inject
|
||||
public TransportFreezeIndexAction(TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
DestructiveOperations destructiveOperations) {
|
||||
super(FreezeIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
|
||||
FreezeRequest::new);
|
||||
this.destructiveOperations = destructiveOperations;
|
||||
}
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, FreezeRequest request, ActionListener<AcknowledgedResponse> listener) {
|
||||
destructiveOperations.failDestructive(request.indices());
|
||||
super.doExecute(task, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AcknowledgedResponse newResponse() {
|
||||
return new AcknowledgedResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(FreezeRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
|
||||
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
|
||||
if (concreteIndices == null || concreteIndices.length == 0) {
|
||||
throw new ResourceNotFoundException("index not found");
|
||||
}
|
||||
|
||||
clusterService.submitStateUpdateTask("toggle-frozen-settings",
|
||||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(Priority.URGENT, request, listener) {
|
||||
@Override
|
||||
public ClusterState execute(final ClusterState currentState) {
|
||||
final MetaData.Builder builder = MetaData.builder(currentState.metaData());
|
||||
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
|
||||
for (Index index : concreteIndices) {
|
||||
IndexMetaData meta = currentState.metaData().getIndexSafe(index);
|
||||
if (meta.getState() != IndexMetaData.State.CLOSE) {
|
||||
throw new IllegalStateException("index [" + index.getName() + "] is not closed");
|
||||
}
|
||||
final IndexMetaData.Builder imdBuilder = IndexMetaData.builder(meta);
|
||||
final Settings.Builder settingsBuilder =
|
||||
Settings.builder()
|
||||
.put(currentState.metaData().index(index).getSettings())
|
||||
.put("index.blocks.write", request.freeze())
|
||||
.put(FrozenEngine.INDEX_FROZEN.getKey(), request.freeze())
|
||||
.put(IndexSettings.INDEX_SEARCH_THROTTLED.getKey(), request.freeze());
|
||||
if (request.freeze()) {
|
||||
blocks.addIndexBlock(index.getName(), IndexMetaData.INDEX_WRITE_BLOCK);
|
||||
} else {
|
||||
blocks.removeIndexBlock(index.getName(), IndexMetaData.INDEX_WRITE_BLOCK);
|
||||
}
|
||||
imdBuilder.settings(settingsBuilder);
|
||||
builder.put(imdBuilder.build(), true);
|
||||
}
|
||||
return ClusterState.builder(currentState).blocks(blocks).metaData(builder).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AcknowledgedResponse newResponse(boolean acknowledged) {
|
||||
return new AcknowledgedResponse(acknowledged);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(FreezeRequest request, ClusterState state) {
|
||||
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE,
|
||||
indexNameExpressionResolver.concreteIndexNames(state, request));
|
||||
}
|
||||
|
||||
public static class FreezeIndexAction extends Action<AcknowledgedResponse> {
|
||||
|
||||
public static final FreezeIndexAction INSTANCE = new FreezeIndexAction();
|
||||
public static final String NAME = "indices:admin/freeze";
|
||||
|
||||
private FreezeIndexAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AcknowledgedResponse newResponse() {
|
||||
return new AcknowledgedResponse();
|
||||
}
|
||||
}
|
||||
|
||||
public static class FreezeRequest extends AcknowledgedRequest<FreezeRequest>
|
||||
implements IndicesRequest.Replaceable {
|
||||
private String[] indices;
|
||||
private boolean freeze = true;
|
||||
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, false, true);
|
||||
|
||||
public FreezeRequest(String... indices) {
|
||||
this.indices = indices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
if (CollectionUtils.isEmpty(indices)) {
|
||||
validationException = addValidationError("index is missing", validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
||||
public void setFreeze(boolean freeze) {
|
||||
this.freeze = freeze;
|
||||
}
|
||||
|
||||
public boolean freeze() {
|
||||
return freeze;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
indicesOptions = IndicesOptions.readIndicesOptions(in);
|
||||
indices = in.readStringArray();
|
||||
freeze = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
indicesOptions.writeIndicesOptions(out);
|
||||
out.writeStringArray(indices);
|
||||
out.writeBoolean(freeze);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the indices to be frozen or unfrozen
|
||||
*/
|
||||
@Override
|
||||
public String[] indices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies what type of requested indices to ignore and how to deal with wildcard expressions.
|
||||
* For example indices that don't exist.
|
||||
*
|
||||
* @return the current behaviour when it comes to index names and wildcard indices expressions
|
||||
*/
|
||||
@Override
|
||||
public IndicesOptions indicesOptions() {
|
||||
return indicesOptions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies what type of requested indices to ignore and how to deal with wildcard expressions.
|
||||
* For example indices that don't exist.
|
||||
*
|
||||
* @param indicesOptions the desired behaviour regarding indices to ignore and wildcard indices expressions
|
||||
* @return the request itself
|
||||
*/
|
||||
public FreezeRequest indicesOptions(IndicesOptions indicesOptions) {
|
||||
this.indicesOptions = indicesOptions;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndicesRequest indices(String... indices) {
|
||||
this.indices = indices;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,200 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.query.MatchAllQueryBuilder;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.xpack.core.XPackClient;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
|
||||
public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||
return pluginList(XPackPlugin.class);
|
||||
}
|
||||
|
||||
public void testCloseFreezeAndOpen() throws ExecutionException, InterruptedException {
|
||||
createIndex("index", Settings.builder().put("index.number_of_shards", 2).build());
|
||||
client().prepareIndex("index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
client().prepareIndex("index", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
client().prepareIndex("index", "_doc", "3").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
|
||||
client().admin().indices().prepareFlush("index").get();
|
||||
client().admin().indices().prepareClose("index").get();
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
PlainActionFuture<AcknowledgedResponse> future = new PlainActionFuture<>();
|
||||
xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("index"), future);
|
||||
assertAcked(future.get());
|
||||
assertAcked(client().admin().indices().prepareOpen("index"));
|
||||
expectThrows(ClusterBlockException.class, () -> client().prepareIndex("index", "_doc", "4").setSource("field", "value")
|
||||
.setRefreshPolicy(IMMEDIATE).get());
|
||||
IndicesService indexServices = getInstanceFromNode(IndicesService.class);
|
||||
Index index = resolveIndex("index");
|
||||
IndexService indexService = indexServices.indexServiceSafe(index);
|
||||
IndexShard shard = indexService.getShard(0);
|
||||
Engine engine = IndexShardTestCase.getEngine(shard);
|
||||
assertEquals(0, shard.refreshStats().getTotal());
|
||||
boolean useDFS = randomBoolean();
|
||||
assertHitCount(client().prepareSearch().setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED)
|
||||
.setSearchType(useDFS ? SearchType.DFS_QUERY_THEN_FETCH : SearchType.QUERY_THEN_FETCH).get(), 3);
|
||||
assertThat(engine, Matchers.instanceOf(FrozenEngine.class));
|
||||
assertEquals(useDFS ? 3 : 2, shard.refreshStats().getTotal());
|
||||
assertFalse(((FrozenEngine)engine).isReaderOpen());
|
||||
assertTrue(indexService.getIndexSettings().isSearchThrottled());
|
||||
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
|
||||
assertNotNull(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()));
|
||||
}
|
||||
// now scroll
|
||||
SearchResponse searchResponse = client().prepareSearch().setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED)
|
||||
.setScroll(TimeValue.timeValueMinutes(1)).setSize(1).get();
|
||||
do {
|
||||
assertHitCount(searchResponse, 3);
|
||||
assertEquals(1, searchResponse.getHits().getHits().length);
|
||||
SearchService searchService = getInstanceFromNode(SearchService.class);
|
||||
assertThat(searchService.getActiveContexts(), Matchers.greaterThanOrEqualTo(1));
|
||||
for (int i = 0; i < 2; i++) {
|
||||
shard = indexService.getShard(i);
|
||||
engine = IndexShardTestCase.getEngine(shard);
|
||||
assertFalse(((FrozenEngine) engine).isReaderOpen());
|
||||
}
|
||||
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get();
|
||||
} while (searchResponse.getHits().getHits().length > 0);
|
||||
}
|
||||
|
||||
public void testSearchAndGetAPIsAreThrottled() throws ExecutionException, InterruptedException, IOException {
|
||||
XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject("_doc")
|
||||
.startObject("properties").startObject("field").field("type", "text").field("term_vector", "with_positions_offsets_payloads")
|
||||
.endObject().endObject()
|
||||
.endObject().endObject();
|
||||
createIndex("index", Settings.builder().put("index.number_of_shards", 2).build(), "_doc", mapping);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
client().prepareIndex("index", "_doc", "" + i).setSource("field", "foo bar baz").get();
|
||||
}
|
||||
client().admin().indices().prepareFlush("index").get();
|
||||
client().admin().indices().prepareClose("index").get();
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
PlainActionFuture<AcknowledgedResponse> future = new PlainActionFuture<>();
|
||||
TransportFreezeIndexAction.FreezeRequest request =
|
||||
new TransportFreezeIndexAction.FreezeRequest("index");
|
||||
xPackClient.freeze(request, future);
|
||||
assertAcked(future.get());
|
||||
assertAcked(client().admin().indices().prepareOpen("index"));
|
||||
int numRequests = randomIntBetween(20, 50);
|
||||
CountDownLatch latch = new CountDownLatch(numRequests);
|
||||
ActionListener listener = ActionListener.wrap(latch::countDown);
|
||||
int numRefreshes = 0;
|
||||
for (int i = 0; i < numRequests; i++) {
|
||||
numRefreshes++;
|
||||
switch (randomIntBetween(0, 3)) {
|
||||
case 0:
|
||||
client().prepareGet("index", "_doc", "" + randomIntBetween(0, 9)).execute(listener);
|
||||
break;
|
||||
case 1:
|
||||
client().prepareSearch("index").setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH).execute(listener);
|
||||
// in total 4 refreshes 1x query & 1x fetch per shard (we have 2)
|
||||
numRefreshes += 3;
|
||||
break;
|
||||
case 2:
|
||||
client().prepareTermVectors("index", "_doc", "" + randomIntBetween(0, 9)).execute(listener);
|
||||
break;
|
||||
case 3:
|
||||
client().prepareExplain("index", "_doc", "" + randomIntBetween(0, 9)).setQuery(new MatchAllQueryBuilder())
|
||||
.execute(listener);
|
||||
break;
|
||||
default:
|
||||
assert false;
|
||||
}
|
||||
}
|
||||
latch.await();
|
||||
IndicesStatsResponse index = client().admin().indices().prepareStats("index").clear().setRefresh(true).get();
|
||||
assertEquals(numRefreshes, index.getTotal().refresh.getTotal());
|
||||
}
|
||||
|
||||
public void testFreezeAndUnfreeze() throws ExecutionException, InterruptedException {
|
||||
createIndex("index", Settings.builder().put("index.number_of_shards", 2).build());
|
||||
client().prepareIndex("index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
client().prepareIndex("index", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
client().prepareIndex("index", "_doc", "3").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
|
||||
client().admin().indices().prepareFlush("index").get();
|
||||
client().admin().indices().prepareClose("index").get();
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
PlainActionFuture<AcknowledgedResponse> future = new PlainActionFuture<>();
|
||||
TransportFreezeIndexAction.FreezeRequest request =
|
||||
new TransportFreezeIndexAction.FreezeRequest("index");
|
||||
xPackClient.freeze(request, future);
|
||||
assertAcked(future.get());
|
||||
assertAcked(client().admin().indices().prepareOpen("index"));
|
||||
{
|
||||
IndicesService indexServices = getInstanceFromNode(IndicesService.class);
|
||||
Index index = resolveIndex("index");
|
||||
IndexService indexService = indexServices.indexServiceSafe(index);
|
||||
assertTrue(indexService.getIndexSettings().isSearchThrottled());
|
||||
IndexShard shard = indexService.getShard(0);
|
||||
assertEquals(0, shard.refreshStats().getTotal());
|
||||
}
|
||||
client().admin().indices().prepareClose("index").get();
|
||||
request.setFreeze(false);
|
||||
PlainActionFuture<AcknowledgedResponse> future1= new PlainActionFuture<>();
|
||||
xPackClient.freeze(request, future1);
|
||||
assertAcked(future1.get());
|
||||
assertAcked(client().admin().indices().prepareOpen("index"));
|
||||
{
|
||||
IndicesService indexServices = getInstanceFromNode(IndicesService.class);
|
||||
Index index = resolveIndex("index");
|
||||
IndexService indexService = indexServices.indexServiceSafe(index);
|
||||
assertFalse(indexService.getIndexSettings().isSearchThrottled());
|
||||
IndexShard shard = indexService.getShard(0);
|
||||
Engine engine = IndexShardTestCase.getEngine(shard);
|
||||
assertThat(engine, Matchers.instanceOf(InternalEngine.class));
|
||||
}
|
||||
client().prepareIndex("index", "_doc", "4").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
}
|
||||
|
||||
public void testIndexMustBeClosed() {
|
||||
createIndex("test-idx", Settings.builder().put("index.number_of_shards", 2).build());
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
PlainActionFuture<AcknowledgedResponse> future = new PlainActionFuture<>();
|
||||
TransportFreezeIndexAction.FreezeRequest request =
|
||||
new TransportFreezeIndexAction.FreezeRequest("test-idx");
|
||||
xPackClient.freeze(request, future);
|
||||
ExecutionException executionException = expectThrows(ExecutionException.class, () -> future.get());
|
||||
assertThat(executionException.getCause(), Matchers.instanceOf(IllegalStateException.class));
|
||||
assertEquals("index [test-idx] is not closed", executionException.getCause().getMessage());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue