mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-24 17:09:48 +00:00
Fix expiration time in async search response (#55435)
This change ensures that we return the latest expiration time when retrieving the response from the index. This commit also fixes a bug that stops the garbage collection of saved responses if the async search index is deleted.
This commit is contained in:
parent
f13ebc4d4b
commit
0b3bdfcc3e
@ -32,8 +32,6 @@ public class GetAsyncSearchRequest implements Validatable {
|
||||
private TimeValue waitForCompletion;
|
||||
private TimeValue keepAlive;
|
||||
|
||||
public static final long MIN_KEEPALIVE = TimeValue.timeValueMinutes(1).millis();
|
||||
|
||||
private final String id;
|
||||
|
||||
public GetAsyncSearchRequest(String id) {
|
||||
@ -62,14 +60,7 @@ public class GetAsyncSearchRequest implements Validatable {
|
||||
|
||||
@Override
|
||||
public Optional<ValidationException> validate() {
|
||||
final ValidationException validationException = new ValidationException();
|
||||
if (keepAlive != null && keepAlive.getMillis() < MIN_KEEPALIVE) {
|
||||
validationException.addValidationError("keep_alive must be greater than 1 minute, got: " + keepAlive.toString());
|
||||
}
|
||||
if (validationException.validationErrors().isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of(validationException);
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -36,8 +36,6 @@ import java.util.Optional;
|
||||
*/
|
||||
public class SubmitAsyncSearchRequest implements Validatable {
|
||||
|
||||
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 5;
|
||||
|
||||
public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis();
|
||||
|
||||
private TimeValue waitForCompletionTimeout;
|
||||
|
@ -1,41 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client.asyncsearch;
|
||||
|
||||
import org.elasticsearch.client.ValidationException;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class GetAsyncSearchRequestTests extends ESTestCase {
|
||||
|
||||
public void testValidation() {
|
||||
GetAsyncSearchRequest getAsyncSearchRequest = new GetAsyncSearchRequest(randomAlphaOfLength(10));
|
||||
getAsyncSearchRequest.setKeepAlive(new TimeValue(0));
|
||||
assertTrue(getAsyncSearchRequest.validate().isPresent());
|
||||
ValidationException validationException = getAsyncSearchRequest.validate().get();
|
||||
assertEquals(1, validationException.validationErrors().size());
|
||||
assertEquals("Validation Failed: 1: keep_alive must be greater than 1 minute, got: 0s;", validationException.getMessage());
|
||||
|
||||
getAsyncSearchRequest.setKeepAlive(new TimeValue(1, TimeUnit.MINUTES));
|
||||
assertFalse(getAsyncSearchRequest.validate().isPresent());
|
||||
}
|
||||
}
|
@ -63,7 +63,7 @@ public final class SearchShardTarget implements Writeable, Comparable<SearchShar
|
||||
|
||||
@Nullable
|
||||
public String getNodeId() {
|
||||
return nodeId.string();
|
||||
return nodeId != null ? nodeId.string() : null;
|
||||
}
|
||||
|
||||
public Text getNodeIdText() {
|
||||
|
@ -15,9 +15,9 @@ import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
@ -39,6 +39,8 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
|
||||
|
||||
public final class AsyncSearch extends Plugin implements ActionPlugin {
|
||||
private final Settings settings;
|
||||
|
||||
@ -84,11 +86,16 @@ public final class AsyncSearch extends Plugin implements ActionPlugin {
|
||||
AsyncSearchIndexService indexService =
|
||||
new AsyncSearchIndexService(clusterService, threadPool.getThreadContext(), client, namedWriteableRegistry);
|
||||
AsyncSearchMaintenanceService maintenanceService =
|
||||
new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), threadPool, indexService, TimeValue.timeValueHours(1));
|
||||
new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), settings, threadPool, indexService);
|
||||
clusterService.addListener(maintenanceService);
|
||||
return Collections.singletonList(maintenanceService);
|
||||
} else {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Collections.singletonList(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING);
|
||||
}
|
||||
}
|
||||
|
@ -265,14 +265,16 @@ class AsyncSearchIndexService {
|
||||
return;
|
||||
}
|
||||
|
||||
if (restoreResponseHeaders) {
|
||||
if (restoreResponseHeaders && get.getSource().containsKey(RESPONSE_HEADERS_FIELD)) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, List<String>> responseHeaders = (Map<String, List<String>>) get.getSource().get(RESPONSE_HEADERS_FIELD);
|
||||
restoreResponseHeadersContext(securityContext.getThreadContext(), responseHeaders);
|
||||
}
|
||||
|
||||
long expirationTime = (long) get.getSource().get(EXPIRATION_TIME_FIELD);
|
||||
String encoded = (String) get.getSource().get(RESULT_FIELD);
|
||||
listener.onResponse(encoded != null ? decodeResponse(encoded) : null);
|
||||
AsyncSearchResponse response = decodeResponse(encoded, expirationTime);
|
||||
listener.onResponse(encoded != null ? response : null);
|
||||
},
|
||||
listener::onFailure
|
||||
));
|
||||
@ -331,11 +333,11 @@ class AsyncSearchIndexService {
|
||||
/**
|
||||
* Decode the provided base-64 bytes into a {@link AsyncSearchResponse}.
|
||||
*/
|
||||
AsyncSearchResponse decodeResponse(String value) throws IOException {
|
||||
AsyncSearchResponse decodeResponse(String value, long expirationTime) throws IOException {
|
||||
try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
|
||||
try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) {
|
||||
in.setVersion(Version.readVersion(in));
|
||||
return new AsyncSearchResponse(in);
|
||||
return new AsyncSearchResponse(in, expirationTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -14,6 +14,8 @@ import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
@ -26,6 +28,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.xpack.search.AsyncSearchIndexService.EXPIRATION_TIME_FIELD;
|
||||
import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX;
|
||||
|
||||
/**
|
||||
* A service that runs a periodic cleanup over the async-search index.
|
||||
@ -33,23 +36,32 @@ import static org.elasticsearch.xpack.search.AsyncSearchIndexService.EXPIRATION_
|
||||
class AsyncSearchMaintenanceService implements Releasable, ClusterStateListener {
|
||||
private static final Logger logger = LogManager.getLogger(AsyncSearchMaintenanceService.class);
|
||||
|
||||
/**
|
||||
* Controls the interval at which the cleanup is scheduled.
|
||||
* Defaults to 1h. It is an undocumented/expert setting that
|
||||
* is mainly used by integration tests to make the garbage
|
||||
* collection of search responses more reactive.
|
||||
*/
|
||||
public static final Setting<TimeValue> ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING =
|
||||
Setting.timeSetting("async_search.index_cleanup_interval", TimeValue.timeValueHours(1), Setting.Property.NodeScope);
|
||||
|
||||
private final String localNodeId;
|
||||
private final ThreadPool threadPool;
|
||||
private final AsyncSearchIndexService indexService;
|
||||
private final TimeValue delay;
|
||||
|
||||
private final AtomicBoolean isCleanupRunning = new AtomicBoolean(false);
|
||||
private boolean isCleanupRunning;
|
||||
private final AtomicBoolean isClosed = new AtomicBoolean(false);
|
||||
private volatile Scheduler.Cancellable cancellable;
|
||||
|
||||
AsyncSearchMaintenanceService(String localNodeId,
|
||||
Settings nodeSettings,
|
||||
ThreadPool threadPool,
|
||||
AsyncSearchIndexService indexService,
|
||||
TimeValue delay) {
|
||||
AsyncSearchIndexService indexService) {
|
||||
this.localNodeId = localNodeId;
|
||||
this.threadPool = threadPool;
|
||||
this.indexService = indexService;
|
||||
this.delay = delay;
|
||||
this.delay = ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -62,31 +74,30 @@ class AsyncSearchMaintenanceService implements Releasable, ClusterStateListener
|
||||
tryStartCleanup(state);
|
||||
}
|
||||
|
||||
void tryStartCleanup(ClusterState state) {
|
||||
synchronized void tryStartCleanup(ClusterState state) {
|
||||
if (isClosed.get()) {
|
||||
return;
|
||||
}
|
||||
IndexRoutingTable indexRouting = state.routingTable().index(AsyncSearchIndexService.INDEX);
|
||||
if (indexRouting == null) {
|
||||
if (isCleanupRunning.compareAndSet(true, false)) {
|
||||
close();
|
||||
}
|
||||
stop();
|
||||
return;
|
||||
}
|
||||
String primaryNodeId = indexRouting.shard(0).primaryShard().currentNodeId();
|
||||
if (localNodeId.equals(primaryNodeId)) {
|
||||
if (isCleanupRunning.compareAndSet(false, true)) {
|
||||
if (isCleanupRunning == false) {
|
||||
isCleanupRunning = true;
|
||||
executeNextCleanup();
|
||||
}
|
||||
} else if (isCleanupRunning.compareAndSet(true, false)) {
|
||||
close();
|
||||
} else {
|
||||
stop();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void executeNextCleanup() {
|
||||
if (isClosed.get() == false && isCleanupRunning.get()) {
|
||||
if (isClosed.get() == false && isCleanupRunning) {
|
||||
long nowInMillis = System.currentTimeMillis();
|
||||
DeleteByQueryRequest toDelete = new DeleteByQueryRequest()
|
||||
DeleteByQueryRequest toDelete = new DeleteByQueryRequest(INDEX)
|
||||
.setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis));
|
||||
indexService.getClient()
|
||||
.execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(() -> scheduleNextCleanup()));
|
||||
@ -94,7 +105,7 @@ class AsyncSearchMaintenanceService implements Releasable, ClusterStateListener
|
||||
}
|
||||
|
||||
synchronized void scheduleNextCleanup() {
|
||||
if (isClosed.get() == false && isCleanupRunning.get()) {
|
||||
if (isClosed.get() == false && isCleanupRunning) {
|
||||
try {
|
||||
cancellable = threadPool.schedule(this::executeNextCleanup, delay, ThreadPool.Names.GENERIC);
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
@ -107,11 +118,18 @@ class AsyncSearchMaintenanceService implements Releasable, ClusterStateListener
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void stop() {
|
||||
if (isCleanupRunning) {
|
||||
if (cancellable != null && cancellable.isCancelled() == false) {
|
||||
cancellable.cancel();
|
||||
}
|
||||
isCleanupRunning = false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (cancellable != null && cancellable.isCancelled() == false) {
|
||||
cancellable.cancel();
|
||||
}
|
||||
stop();
|
||||
isClosed.compareAndSet(false, true);
|
||||
}
|
||||
}
|
||||
|
@ -297,6 +297,7 @@ final class AsyncSearchTask extends SearchTask {
|
||||
*/
|
||||
private AsyncSearchResponse getResponse() {
|
||||
assert searchResponse.get() != null;
|
||||
checkCancellation();
|
||||
return searchResponse.get().toAsyncSearchResponse(this, expirationTimeMillis);
|
||||
}
|
||||
|
||||
@ -306,15 +307,17 @@ final class AsyncSearchTask extends SearchTask {
|
||||
*/
|
||||
private AsyncSearchResponse getResponseWithHeaders() {
|
||||
assert searchResponse.get() != null;
|
||||
checkCancellation();
|
||||
return searchResponse.get().toAsyncSearchResponseWithHeaders(this, expirationTimeMillis);
|
||||
}
|
||||
|
||||
|
||||
|
||||
// checks if the search task should be cancelled
|
||||
private void checkCancellation() {
|
||||
private synchronized void checkCancellation() {
|
||||
long now = System.currentTimeMillis();
|
||||
if (expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) {
|
||||
if (hasCompleted == false &&
|
||||
expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) {
|
||||
// we cancel the search task if the initial submit task was cancelled,
|
||||
// this is needed because the task cancellation mechanism doesn't
|
||||
// handle the cancellation of grand-children.
|
||||
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.search;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
@ -25,6 +26,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.index.engine.DocumentMissingException;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
@ -187,7 +189,9 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
|
||||
store.storeFinalResponse(searchTask.getSearchId().getDocId(), threadContext.getResponseHeaders(),response,
|
||||
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
|
||||
exc -> {
|
||||
if (exc.getCause() instanceof DocumentMissingException == false) {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(exc);
|
||||
if (cause instanceof DocumentMissingException == false &&
|
||||
cause instanceof VersionConflictEngineException == false) {
|
||||
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
|
||||
searchTask.getSearchId().getEncoded()), exc);
|
||||
}
|
||||
|
@ -7,6 +7,7 @@
|
||||
package org.elasticsearch.xpack.search;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
@ -31,10 +32,11 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
// TODO: add tests for keepAlive and expiration
|
||||
public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
||||
private String indexName;
|
||||
private int numShards;
|
||||
@ -275,4 +277,117 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
||||
deleteAsyncSearch(response.getId());
|
||||
ensureTaskRemoval(response.getId());
|
||||
}
|
||||
|
||||
public void testUpdateRunningKeepAlive() throws Exception {
|
||||
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName);
|
||||
request.getSearchRequest().source(
|
||||
new SearchSourceBuilder().aggregation(new CancellingAggregationBuilder("test"))
|
||||
);
|
||||
long now = System.currentTimeMillis();
|
||||
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
||||
AsyncSearchResponse response = submitAsyncSearch(request);
|
||||
assertNotNull(response.getSearchResponse());
|
||||
assertTrue(response.isRunning());
|
||||
assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
||||
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
||||
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
||||
assertThat(response.getExpirationTime(), greaterThan(now));
|
||||
long expirationTime = response.getExpirationTime();
|
||||
|
||||
response = getAsyncSearch(response.getId());
|
||||
assertNotNull(response.getSearchResponse());
|
||||
assertTrue(response.isRunning());
|
||||
assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
||||
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
||||
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
||||
|
||||
response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10));
|
||||
assertThat(response.getExpirationTime(), greaterThan(expirationTime));
|
||||
|
||||
assertTrue(response.isRunning());
|
||||
assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
||||
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
||||
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
||||
|
||||
response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1));
|
||||
assertThat(response.getExpirationTime(), lessThan(expirationTime));
|
||||
ensureTaskNotRunning(response.getId());
|
||||
ensureTaskRemoval(response.getId());
|
||||
}
|
||||
|
||||
public void testUpdateStoreKeepAlive() throws Exception {
|
||||
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName);
|
||||
long now = System.currentTimeMillis();
|
||||
request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10));
|
||||
request.setKeepOnCompletion(true);
|
||||
AsyncSearchResponse response = submitAsyncSearch(request);
|
||||
assertNotNull(response.getSearchResponse());
|
||||
assertFalse(response.isRunning());
|
||||
assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
||||
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
|
||||
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
||||
assertThat(response.getExpirationTime(), greaterThan(now));
|
||||
long expirationTime = response.getExpirationTime();
|
||||
|
||||
response = getAsyncSearch(response.getId());
|
||||
assertNotNull(response.getSearchResponse());
|
||||
assertFalse(response.isRunning());
|
||||
assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
||||
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
|
||||
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
||||
|
||||
response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10));
|
||||
assertThat(response.getExpirationTime(), greaterThan(expirationTime));
|
||||
|
||||
assertFalse(response.isRunning());
|
||||
assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
||||
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
|
||||
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
||||
|
||||
response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1));
|
||||
assertThat(response.getExpirationTime(), lessThan(expirationTime));
|
||||
ensureTaskNotRunning(response.getId());
|
||||
ensureTaskRemoval(response.getId());
|
||||
}
|
||||
|
||||
public void testRemoveAsyncIndex() throws Exception {
|
||||
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName);
|
||||
request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10));
|
||||
request.setKeepOnCompletion(true);
|
||||
long now = System.currentTimeMillis();
|
||||
AsyncSearchResponse response = submitAsyncSearch(request);
|
||||
assertNotNull(response.getSearchResponse());
|
||||
assertFalse(response.isRunning());
|
||||
assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
||||
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
|
||||
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
||||
assertThat(response.getExpirationTime(), greaterThan(now));
|
||||
|
||||
// remove the async search index
|
||||
client().admin().indices().prepareDelete(AsyncSearchIndexService.INDEX).get();
|
||||
|
||||
Exception exc = expectThrows(Exception.class, () -> getAsyncSearch(response.getId()));
|
||||
Throwable cause = exc instanceof ExecutionException ?
|
||||
ExceptionsHelper.unwrapCause(exc.getCause()) : ExceptionsHelper.unwrapCause(exc);
|
||||
assertThat(ExceptionsHelper.status(cause).getStatus(), equalTo(404));
|
||||
|
||||
SubmitAsyncSearchRequest newReq = new SubmitAsyncSearchRequest(indexName);
|
||||
newReq.getSearchRequest().source(
|
||||
new SearchSourceBuilder().aggregation(new CancellingAggregationBuilder("test"))
|
||||
);
|
||||
newReq.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
||||
AsyncSearchResponse newResp = submitAsyncSearch(newReq);
|
||||
assertNotNull(newResp.getSearchResponse());
|
||||
assertTrue(newResp.isRunning());
|
||||
assertThat(newResp.getSearchResponse().getTotalShards(), equalTo(numShards));
|
||||
assertThat(newResp.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
||||
assertThat(newResp.getSearchResponse().getFailedShards(), equalTo(0));
|
||||
long expirationTime = newResp.getExpirationTime();
|
||||
|
||||
// check garbage collection
|
||||
newResp = getAsyncSearch(newResp.getId(), TimeValue.timeValueMillis(1));
|
||||
assertThat(newResp.getExpirationTime(), lessThan(expirationTime));
|
||||
ensureTaskNotRunning(newResp.getId());
|
||||
ensureTaskRemoval(newResp.getId());
|
||||
}
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
AsyncSearchResponse response = randomAsyncSearchResponse(randomSearchId(), randomSearchResponse());
|
||||
String encoded = indexService.encodeResponse(response);
|
||||
AsyncSearchResponse same = indexService.decodeResponse(encoded);
|
||||
AsyncSearchResponse same = indexService.decodeResponse(encoded, response.getExpirationTime());
|
||||
assertEqualResponses(response, same);
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,7 @@
|
||||
package org.elasticsearch.xpack.search;
|
||||
|
||||
import org.apache.lucene.search.TotalHits;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
|
||||
@ -49,6 +50,7 @@ import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX;
|
||||
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
@ -72,6 +74,14 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
|
||||
.put(XPackSettings.SECURITY_ENABLED.getKey(), false).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(0))
|
||||
.put(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(100))
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Restart the node that runs the {@link TaskId} decoded from the provided {@link AsyncSearchId}.
|
||||
*/
|
||||
@ -97,6 +107,10 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
|
||||
return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncSearchAction.Request(id)).get();
|
||||
}
|
||||
|
||||
protected AsyncSearchResponse getAsyncSearch(String id, TimeValue keepAlive) throws ExecutionException, InterruptedException {
|
||||
return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncSearchAction.Request(id).setKeepAlive(keepAlive)).get();
|
||||
}
|
||||
|
||||
protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException {
|
||||
return client().execute(DeleteAsyncSearchAction.INSTANCE, new DeleteAsyncSearchAction.Request(id)).get();
|
||||
}
|
||||
@ -115,6 +129,19 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
|
||||
});
|
||||
}
|
||||
|
||||
protected void ensureTaskNotRunning(String id) throws Exception {
|
||||
assertBusy(() -> {
|
||||
try {
|
||||
AsyncSearchResponse resp = getAsyncSearch(id);
|
||||
assertFalse(resp.isRunning());
|
||||
} catch (Exception exc) {
|
||||
if (ExceptionsHelper.unwrapCause(exc.getCause()) instanceof ResourceNotFoundException == false) {
|
||||
throw exc;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait the completion of the {@link TaskId} decoded from the provided {@link AsyncSearchId}.
|
||||
*/
|
||||
|
@ -34,8 +34,4 @@ public class GetAsyncSearchRequestTests extends AbstractWireSerializingTestCase<
|
||||
return AsyncSearchId.encode(UUIDs.randomBase64UUID(),
|
||||
new TaskId(randomAlphaOfLengthBetween(10, 20), randomLongBetween(0, Long.MAX_VALUE)));
|
||||
}
|
||||
|
||||
public void testValidateWaitForCompletion() {
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ public class AsyncSearchResponse extends ActionResponse implements StatusToXCont
|
||||
private final boolean isPartial;
|
||||
|
||||
private final long startTimeMillis;
|
||||
private final long expirationTimeMillis;
|
||||
private long expirationTimeMillis;
|
||||
|
||||
/**
|
||||
* Creates an {@link AsyncSearchResponse} with meta-information only (not-modified).
|
||||
@ -74,13 +74,18 @@ public class AsyncSearchResponse extends ActionResponse implements StatusToXCont
|
||||
}
|
||||
|
||||
public AsyncSearchResponse(StreamInput in) throws IOException {
|
||||
this(in, null);
|
||||
}
|
||||
|
||||
public AsyncSearchResponse(StreamInput in, Long expirationTime) throws IOException {
|
||||
this.id = in.readOptionalString();
|
||||
this.error = in.readOptionalWriteable(ElasticsearchException::new);
|
||||
this.searchResponse = in.readOptionalWriteable(SearchResponse::new);
|
||||
this.isPartial = in.readBoolean();
|
||||
this.isRunning = in.readBoolean();
|
||||
this.startTimeMillis = in.readLong();
|
||||
this.expirationTimeMillis = in.readLong();
|
||||
long origExpiration = in.readLong();
|
||||
this.expirationTimeMillis = expirationTime == null ? origExpiration : expirationTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -157,6 +162,10 @@ public class AsyncSearchResponse extends ActionResponse implements StatusToXCont
|
||||
return expirationTimeMillis;
|
||||
}
|
||||
|
||||
public void setExpirationTime(long expirationTimeMillis) {
|
||||
this.expirationTimeMillis = expirationTimeMillis;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestStatus status() {
|
||||
if (searchResponse == null || isPartial) {
|
||||
|
@ -16,9 +16,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
import static org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest.MIN_KEEP_ALIVE;
|
||||
|
||||
public class GetAsyncSearchAction extends ActionType<AsyncSearchResponse> {
|
||||
public static final GetAsyncSearchAction INSTANCE = new GetAsyncSearchAction();
|
||||
public static final String NAME = "indices:data/read/async_search/get";
|
||||
@ -63,12 +60,7 @@ public class GetAsyncSearchAction extends ActionType<AsyncSearchResponse> {
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
if (keepAlive.getMillis() != -1 && keepAlive.getMillis() < MIN_KEEP_ALIVE) {
|
||||
validationException =
|
||||
addValidationError("keep_alive must be greater than 1 minute, got:" + keepAlive.toString(), validationException);
|
||||
}
|
||||
return validationException;
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user