From 8c7ef2417f8fb8b272bd2a7fc86436327d9400b1 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 23 Apr 2020 18:02:17 -0400 Subject: [PATCH] Make AsyncSearchIndexService reusable (#55598) EQL will require very similar functionality to async search. This PR refactors AsyncSearchIndexService to make it reusable for EQL. Supersedes #55119 Relates to #49638 --- .../xpack/search/AsyncSearchSecurityIT.java | 5 +- .../xpack/search/AsyncSearch.java | 9 +- .../search/AsyncSearchMaintenanceService.java | 112 +-------------- .../xpack/search/AsyncSearchTask.java | 18 ++- .../xpack/search/MutableSearchResponse.java | 4 +- .../TransportDeleteAsyncSearchAction.java | 16 ++- .../search/TransportGetAsyncSearchAction.java | 17 ++- .../TransportSubmitAsyncSearchAction.java | 24 ++-- .../xpack/search/AsyncSearchActionIT.java | 2 +- .../search/AsyncSearchIntegTestCase.java | 15 +- .../xpack/search/AsyncSearchTaskTests.java | 7 +- .../search/GetAsyncSearchRequestTests.java | 3 +- .../xpack/core/async/AsyncExecutionId.java} | 36 ++--- .../xpack/core/async/AsyncResponse.java | 22 +++ .../xpack/core/async/AsyncTask.java | 24 ++++ .../core/async/AsyncTaskIndexService.java} | 117 ++++++++-------- .../async/AsyncTaskMaintenanceService.java | 129 ++++++++++++++++++ .../search/action/AsyncSearchResponse.java | 18 ++- .../core/async/AsyncExecutionIdTests.java} | 18 +-- .../async/AsyncSearchIndexServiceTests.java | 86 ++++++++++++ .../core/async/AsyncTaskServiceTests.java} | 31 ++--- 21 files changed, 449 insertions(+), 264 deletions(-) rename x-pack/plugin/{async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchId.java => core/src/main/java/org/elasticsearch/xpack/core/async/AsyncExecutionId.java} (71%) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResponse.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java rename x-pack/plugin/{async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java => core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java} (78%) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskMaintenanceService.java rename x-pack/plugin/{async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIdTests.java => core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdTests.java} (60%) create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java rename x-pack/plugin/{async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIndexServiceTests.java => core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java} (80%) diff --git a/x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java b/x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java index bfca713db53..60102798f6f 100644 --- a/x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java +++ b/x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.junit.Before; import java.io.IOException; @@ -28,7 +29,7 @@ import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField.RUN_AS_USER_HEADER; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; -import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX; +import static org.elasticsearch.xpack.search.AsyncSearch.INDEX; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -81,7 +82,7 @@ public class AsyncSearchSecurityIT extends ESRestTestCase { assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(404)); // other and user cannot access the result from direct get calls - AsyncSearchId searchId = AsyncSearchId.decode(id); + AsyncExecutionId searchId = AsyncExecutionId.decode(id); for (String runAs : new String[] {user, other}) { exc = expectThrows(ResponseException.class, () -> get(INDEX, searchId.getDocId(), runAs)); assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403)); diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java index bae2274e636..41a25f2ce26 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java @@ -29,6 +29,8 @@ import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; +import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; @@ -39,9 +41,11 @@ import java.util.Collections; import java.util.List; import java.util.function.Supplier; +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING; public final class AsyncSearch extends Plugin implements ActionPlugin { + public static final String INDEX = ".async-search"; private final Settings settings; public AsyncSearch(Settings settings) { @@ -83,8 +87,9 @@ public final class AsyncSearch extends Plugin implements ActionPlugin { Supplier repositoriesServiceSupplier) { if (DiscoveryNode.isDataNode(environment.settings())) { // only data nodes should be eligible to run the maintenance service. - AsyncSearchIndexService indexService = - new AsyncSearchIndexService(clusterService, threadPool.getThreadContext(), client, namedWriteableRegistry); + AsyncTaskIndexService indexService = + new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, + AsyncSearchResponse::new, namedWriteableRegistry); AsyncSearchMaintenanceService maintenanceService = new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), settings, threadPool, indexService); clusterService.addListener(maintenanceService); diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java index d0fd14f409f..462ad15a14b 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java @@ -6,35 +6,14 @@ package org.elasticsearch.xpack.search; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.reindex.DeleteByQueryAction; -import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; +import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.elasticsearch.xpack.search.AsyncSearchIndexService.EXPIRATION_TIME_FIELD; -import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX; - -/** - * A service that runs a periodic cleanup over the async-search index. - */ -class AsyncSearchMaintenanceService implements Releasable, ClusterStateListener { - private static final Logger logger = LogManager.getLogger(AsyncSearchMaintenanceService.class); +public class AsyncSearchMaintenanceService extends AsyncTaskMaintenanceService { /** * Controls the interval at which the cleanup is scheduled. @@ -45,91 +24,10 @@ class AsyncSearchMaintenanceService implements Releasable, ClusterStateListener public static final Setting ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING = Setting.timeSetting("async_search.index_cleanup_interval", TimeValue.timeValueHours(1), Setting.Property.NodeScope); - private final String localNodeId; - private final ThreadPool threadPool; - private final AsyncSearchIndexService indexService; - private final TimeValue delay; - - private boolean isCleanupRunning; - private final AtomicBoolean isClosed = new AtomicBoolean(false); - private volatile Scheduler.Cancellable cancellable; - AsyncSearchMaintenanceService(String localNodeId, Settings nodeSettings, ThreadPool threadPool, - AsyncSearchIndexService indexService) { - this.localNodeId = localNodeId; - this.threadPool = threadPool; - this.indexService = indexService; - this.delay = ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings); - } - - @Override - public void clusterChanged(ClusterChangedEvent event) { - final ClusterState state = event.state(); - if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { - // Wait until the gateway has recovered from disk. - return; - } - tryStartCleanup(state); - } - - synchronized void tryStartCleanup(ClusterState state) { - if (isClosed.get()) { - return; - } - IndexRoutingTable indexRouting = state.routingTable().index(AsyncSearchIndexService.INDEX); - if (indexRouting == null) { - stop(); - return; - } - String primaryNodeId = indexRouting.shard(0).primaryShard().currentNodeId(); - if (localNodeId.equals(primaryNodeId)) { - if (isCleanupRunning == false) { - isCleanupRunning = true; - executeNextCleanup(); - } - } else { - stop(); - } - } - - synchronized void executeNextCleanup() { - if (isClosed.get() == false && isCleanupRunning) { - long nowInMillis = System.currentTimeMillis(); - DeleteByQueryRequest toDelete = new DeleteByQueryRequest(INDEX) - .setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis)); - indexService.getClient() - .execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(() -> scheduleNextCleanup())); - } - } - - synchronized void scheduleNextCleanup() { - if (isClosed.get() == false && isCleanupRunning) { - try { - cancellable = threadPool.schedule(this::executeNextCleanup, delay, ThreadPool.Names.GENERIC); - } catch (EsRejectedExecutionException e) { - if (e.isExecutorShutdown()) { - logger.debug("failed to schedule next maintenance task; shutting down", e); - } else { - throw e; - } - } - } - } - - synchronized void stop() { - if (isCleanupRunning) { - if (cancellable != null && cancellable.isCancelled() == false) { - cancellable.cancel(); - } - isCleanupRunning = false; - } - } - - @Override - public void close() { - stop(); - isClosed.compareAndSet(false, true); + AsyncTaskIndexService indexService) { + super(AsyncSearch.INDEX, localNodeId, threadPool, indexService, ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings)); } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index 41d8c53267d..95941439293 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -27,6 +27,8 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.AsyncTask; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import java.util.ArrayList; @@ -42,9 +44,9 @@ import java.util.function.Supplier; /** * Task that tracks the progress of a currently running {@link SearchRequest}. */ -final class AsyncSearchTask extends SearchTask { +final class AsyncSearchTask extends SearchTask implements AsyncTask { private final BooleanSupplier checkSubmitCancellation; - private final AsyncSearchId searchId; + private final AsyncExecutionId searchId; private final Client client; private final ThreadPool threadPool; private final Supplier aggReduceContextSupplier; @@ -73,7 +75,7 @@ final class AsyncSearchTask extends SearchTask { * @param checkSubmitCancellation A boolean supplier that checks if the submit task has been cancelled. * @param originHeaders All the request context headers. * @param taskHeaders The filtered request headers for the task. - * @param searchId The {@link AsyncSearchId} of the task. + * @param searchId The {@link AsyncExecutionId} of the task. * @param threadPool The threadPool to schedule runnable. * @param aggReduceContextSupplier A supplier to create final reduce contexts. */ @@ -85,7 +87,7 @@ final class AsyncSearchTask extends SearchTask { TimeValue keepAlive, Map originHeaders, Map taskHeaders, - AsyncSearchId searchId, + AsyncExecutionId searchId, Client client, ThreadPool threadPool, Supplier aggReduceContextSupplier) { @@ -104,14 +106,16 @@ final class AsyncSearchTask extends SearchTask { /** * Returns all of the request contexts headers */ - Map getOriginHeaders() { + @Override + public Map getOriginHeaders() { return originHeaders; } /** - * Returns the {@link AsyncSearchId} of the task + * Returns the {@link AsyncExecutionId} of the task */ - AsyncSearchId getSearchId() { + @Override + public AsyncExecutionId getExecutionId() { return searchId; } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java index 7b01cee01b1..985e22b5e7f 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java @@ -28,7 +28,7 @@ import java.util.function.Supplier; import static java.util.Collections.singletonList; import static org.apache.lucene.search.TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO; import static org.elasticsearch.search.aggregations.InternalAggregations.topLevelReduce; -import static org.elasticsearch.xpack.search.AsyncSearchIndexService.restoreResponseHeadersContext; +import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.restoreResponseHeadersContext; /** * A mutable search response that allows to update and create partial response synchronously. @@ -156,7 +156,7 @@ class MutableSearchResponse { } else { resp = null; } - return new AsyncSearchResponse(task.getSearchId().getEncoded(), resp, failure, isPartial, + return new AsyncSearchResponse(task.getExecutionId().getEncoded(), resp, failure, isPartial, frozen == false, task.getStartTime(), expirationTime); } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java index 62c08ff37d5..ae628edc28e 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java @@ -26,16 +26,21 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; +import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; import java.io.IOException; +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; + public class TransportDeleteAsyncSearchAction extends HandledTransportAction { private static final Logger logger = LogManager.getLogger(TransportDeleteAsyncSearchAction.class); private final ClusterService clusterService; private final TransportService transportService; - private final AsyncSearchIndexService store; + private final AsyncTaskIndexService store; @Inject public TransportDeleteAsyncSearchAction(TransportService transportService, @@ -45,7 +50,8 @@ public class TransportDeleteAsyncSearchAction extends HandledTransportAction(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client, + ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry); this.clusterService = clusterService; this.transportService = transportService; } @@ -53,7 +59,7 @@ public class TransportDeleteAsyncSearchAction extends HandledTransportAction listener) { try { - AsyncSearchId searchId = AsyncSearchId.decode(request.getId()); + AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId()); DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId()); if (clusterService.localNode().getId().equals(searchId.getTaskId().getNodeId()) || node == null) { cancelTaskAndDeleteResult(searchId, listener); @@ -67,8 +73,8 @@ public class TransportDeleteAsyncSearchAction extends HandledTransportAction listener) throws IOException { - AsyncSearchTask task = store.getTask(taskManager, searchId); + void cancelTaskAndDeleteResult(AsyncExecutionId searchId, ActionListener listener) throws IOException { + AsyncSearchTask task = store.getTask(taskManager, searchId, AsyncSearchTask.class); if (task != null) { //the task was found and gets cancelled. The response may or may not be found, but we will return 200 anyways. task.cancelTask(() -> store.deleteResponse(searchId, diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java index 0e00f779ca3..f2638d94857 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java @@ -24,14 +24,18 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; + public class TransportGetAsyncSearchAction extends HandledTransportAction { private final Logger logger = LogManager.getLogger(TransportGetAsyncSearchAction.class); private final ClusterService clusterService; private final TransportService transportService; - private final AsyncSearchIndexService store; + private final AsyncTaskIndexService store; @Inject public TransportGetAsyncSearchAction(TransportService transportService, @@ -43,14 +47,15 @@ public class TransportGetAsyncSearchAction extends HandledTransportAction(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client, + ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry); } @Override protected void doExecute(Task task, GetAsyncSearchAction.Request request, ActionListener listener) { try { long nowInMillis = System.currentTimeMillis(); - AsyncSearchId searchId = AsyncSearchId.decode(request.getId()); + AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId()); DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId()); if (clusterService.localNode().getId().equals(searchId.getTaskId().getNodeId()) || node == null) { if (request.getKeepAlive().getMillis() > 0) { @@ -82,13 +87,13 @@ public class TransportGetAsyncSearchAction extends HandledTransportAction listener) { try { - final AsyncSearchTask task = store.getTask(taskManager, searchId); + final AsyncSearchTask task = store.getTask(taskManager, searchId, AsyncSearchTask.class); if (task == null) { getSearchResponseFromIndex(searchId, request, nowInMillis, listener); return; @@ -118,7 +123,7 @@ public class TransportGetAsyncSearchAction extends HandledTransportAction listener) { diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index 45628a9ccfd..10b937f7119 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -34,6 +34,8 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest; @@ -42,6 +44,8 @@ import java.util.Map; import java.util.function.Function; import java.util.function.Supplier; +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; + public class TransportSubmitAsyncSearchAction extends HandledTransportAction { private static final Logger logger = LogManager.getLogger(TransportSubmitAsyncSearchAction.class); @@ -49,7 +53,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction requestToAggReduceContextBuilder; private final TransportSearchAction searchAction; private final ThreadContext threadContext; - private final AsyncSearchIndexService store; + private final AsyncTaskIndexService store; @Inject public TransportSubmitAsyncSearchAction(ClusterService clusterService, @@ -65,7 +69,8 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction searchService.aggReduceContextBuilder(request).forFinalReduction(); this.searchAction = searchAction; this.threadContext = transportService.getThreadPool().getThreadContext(); - this.store = new AsyncSearchIndexService(clusterService, threadContext, client, registry); + this.store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadContext, client, + ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry); } @Override @@ -88,7 +93,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction taskHeaders) { - AsyncSearchId searchId = new AsyncSearchId(docID, new TaskId(nodeClient.getLocalNodeId(), id)); + AsyncExecutionId searchId = new AsyncExecutionId(docID, new TaskId(nodeClient.getLocalNodeId(), id)); Supplier aggReduceContextSupplier = () -> requestToAggReduceContextBuilder.apply(request.getSearchRequest()); return new AsyncSearchTask(id, type, action, parentTaskId, @@ -176,29 +181,30 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction unregisterTaskAndMoveOn(searchTask, nextAction), exc -> { - logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchTask.getSearchId()), exc); + logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchTask.getExecutionId()), exc); unregisterTaskAndMoveOn(searchTask, nextAction); })); return; } try { - store.storeFinalResponse(searchTask.getSearchId().getDocId(), threadContext.getResponseHeaders(),response, + store.storeFinalResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response, ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction), exc -> { Throwable cause = ExceptionsHelper.unwrapCause(exc); if (cause instanceof DocumentMissingException == false && cause instanceof VersionConflictEngineException == false) { logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", - searchTask.getSearchId().getEncoded()), exc); + searchTask.getExecutionId().getEncoded()), exc); } unregisterTaskAndMoveOn(searchTask, nextAction); })); } catch (Exception exc) { - logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getSearchId().getEncoded()), exc); + logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getExecutionId().getEncoded()), + exc); unregisterTaskAndMoveOn(searchTask, nextAction); } } diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index d0275a988a2..c600940dae3 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -364,7 +364,7 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase { assertThat(response.getExpirationTime(), greaterThan(now)); // remove the async search index - client().admin().indices().prepareDelete(AsyncSearchIndexService.INDEX).get(); + client().admin().indices().prepareDelete(AsyncSearch.INDEX).get(); Exception exc = expectThrows(Exception.class, () -> getAsyncSearch(response.getId())); Throwable cause = exc instanceof ExecutionException ? diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java index 3ff7b358e5f..6794a07ce61 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java @@ -27,6 +27,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; @@ -49,7 +50,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; -import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX; +import static org.elasticsearch.xpack.search.AsyncSearch.INDEX; import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -83,10 +84,10 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase { } /** - * Restart the node that runs the {@link TaskId} decoded from the provided {@link AsyncSearchId}. + * Restart the node that runs the {@link TaskId} decoded from the provided {@link AsyncExecutionId}. */ protected void restartTaskNode(String id) throws Exception { - AsyncSearchId searchId = AsyncSearchId.decode(id); + AsyncExecutionId searchId = AsyncExecutionId.decode(id); final ClusterStateResponse clusterState = client().admin().cluster() .prepareState().clear().setNodes(true).get(); DiscoveryNode node = clusterState.getState().nodes().get(searchId.getTaskId().getNodeId()); @@ -116,10 +117,10 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase { } /** - * Wait the removal of the document decoded from the provided {@link AsyncSearchId}. + * Wait the removal of the document decoded from the provided {@link AsyncExecutionId}. */ protected void ensureTaskRemoval(String id) throws Exception { - AsyncSearchId searchId = AsyncSearchId.decode(id); + AsyncExecutionId searchId = AsyncExecutionId.decode(id); assertBusy(() -> { GetResponse resp = client().prepareGet() .setIndex(INDEX) @@ -143,11 +144,11 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase { } /** - * Wait the completion of the {@link TaskId} decoded from the provided {@link AsyncSearchId}. + * Wait the completion of the {@link TaskId} decoded from the provided {@link AsyncExecutionId}. */ protected void ensureTaskCompletion(String id) throws Exception { assertBusy(() -> { - TaskId taskId = AsyncSearchId.decode(id).getTaskId(); + TaskId taskId = AsyncExecutionId.decode(id).getTaskId(); try { GetTaskResponse resp = client().admin().cluster() .prepareGetTask(taskId).get(); diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java index 560685e33ca..4e5b6da703d 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.junit.After; import org.junit.Before; @@ -66,7 +67,7 @@ public class AsyncSearchTaskTests extends ESTestCase { public void testWaitForInit() throws InterruptedException { AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> false, TimeValue.timeValueHours(1), - Collections.emptyMap(), Collections.emptyMap(), new AsyncSearchId("0", new TaskId("node1", 1)), + Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)), new NoOpClient(threadPool), threadPool, null); int numShards = randomIntBetween(0, 10); List shards = new ArrayList<>(); @@ -106,7 +107,7 @@ public class AsyncSearchTaskTests extends ESTestCase { public void testWithFailure() throws InterruptedException { AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> false, TimeValue.timeValueHours(1), - Collections.emptyMap(), Collections.emptyMap(), new AsyncSearchId("0", new TaskId("node1", 1)), + Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)), new NoOpClient(threadPool), threadPool, null); int numThreads = randomIntBetween(1, 10); CountDownLatch latch = new CountDownLatch(numThreads); @@ -134,7 +135,7 @@ public class AsyncSearchTaskTests extends ESTestCase { public void testWaitForCompletion() throws InterruptedException { AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> false, TimeValue.timeValueHours(1), - Collections.emptyMap(), Collections.emptyMap(), new AsyncSearchId("0", new TaskId("node1", 1)), + Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)), new NoOpClient(threadPool), threadPool, null); int numShards = randomIntBetween(0, 10); List shards = new ArrayList<>(); diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/GetAsyncSearchRequestTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/GetAsyncSearchRequestTests.java index 831dfc0e91f..533ae8210ae 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/GetAsyncSearchRequestTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/GetAsyncSearchRequestTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; public class GetAsyncSearchRequestTests extends AbstractWireSerializingTestCase { @@ -31,7 +32,7 @@ public class GetAsyncSearchRequestTests extends AbstractWireSerializingTestCase< } static String randomSearchId() { - return AsyncSearchId.encode(UUIDs.randomBase64UUID(), + return AsyncExecutionId.encode(UUIDs.randomBase64UUID(), new TaskId(randomAlphaOfLengthBetween(10, 20), randomLongBetween(0, Long.MAX_VALUE))); } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchId.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncExecutionId.java similarity index 71% rename from x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchId.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncExecutionId.java index ef8b47ee003..edd24297d4a 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchId.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncExecutionId.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.search; +package org.elasticsearch.xpack.core.async; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; @@ -17,14 +17,14 @@ import java.util.Base64; import java.util.Objects; /** - * A class that contains all information related to a submitted async search. + * A class that contains all information related to a submitted async execution. */ -class AsyncSearchId { +public final class AsyncExecutionId { private final String docId; private final TaskId taskId; private final String encoded; - AsyncSearchId(String docId, TaskId taskId) { + public AsyncExecutionId(String docId, TaskId taskId) { this.docId = docId; this.taskId = taskId; this.encoded = encode(docId, taskId); @@ -33,21 +33,21 @@ class AsyncSearchId { /** * The document id of the response in the index if the task is not running. */ - String getDocId() { + public String getDocId() { return docId; } /** - * The {@link TaskId} of the async search in the task manager. + * The {@link TaskId} of the async execution in the task manager. */ - TaskId getTaskId() { + public TaskId getTaskId() { return taskId; } /** - * Gets the encoded string that represents this search. + * Gets the encoded string that represents this execution. */ - String getEncoded() { + public String getEncoded() { return encoded; } @@ -55,7 +55,7 @@ class AsyncSearchId { public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - AsyncSearchId searchId = (AsyncSearchId) o; + AsyncExecutionId searchId = (AsyncExecutionId) o; return docId.equals(searchId.docId) && taskId.equals(searchId.taskId); } @@ -67,17 +67,17 @@ class AsyncSearchId { @Override public String toString() { - return "AsyncSearchId{" + + return "AsyncExecutionId{" + "docId='" + docId + '\'' + ", taskId=" + taskId + '}'; } /** - * Encodes the informations needed to retrieve a async search response + * Encodes the informations needed to retrieve a async response * in a base64 encoded string. */ - static String encode(String docId, TaskId taskId) { + public static String encode(String docId, TaskId taskId) { try (BytesStreamOutput out = new BytesStreamOutput()) { out.writeString(docId); out.writeString(taskId.toString()); @@ -88,13 +88,13 @@ class AsyncSearchId { } /** - * Decodes a base64 encoded string into an {@link AsyncSearchId} that can be used - * to retrieve the response of an async search. + * Decodes a base64 encoded string into an {@link AsyncExecutionId} that can be used + * to retrieve the response of an async execution. */ - static AsyncSearchId decode(String id) { - final AsyncSearchId searchId; + public static AsyncExecutionId decode(String id) { + final AsyncExecutionId searchId; try (StreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getUrlDecoder().decode(id)))) { - searchId = new AsyncSearchId(in.readString(), new TaskId(in.readString())); + searchId = new AsyncExecutionId(in.readString(), new TaskId(in.readString())); if (in.available() > 0) { throw new IllegalArgumentException("invalid id:[" + id + "]"); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResponse.java new file mode 100644 index 00000000000..ca572dc8e21 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResponse.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.async; + +import org.elasticsearch.common.io.stream.Writeable; + +public interface AsyncResponse> extends Writeable { + /** + * When this response will expire as a timestamp in milliseconds since epoch. + */ + long getExpirationTime(); + + /** + * Returns a copy of this object with a new expiration time + */ + T withExpirationTime(long expirationTimeMillis); + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java new file mode 100644 index 00000000000..18ce4a0fc68 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.async; + +import java.util.Map; + +/** + * A task that supports asynchronous execution and provides information necessary for safe temporary storage of results + */ +public interface AsyncTask { + /** + * Returns all of the request contexts headers + */ + Map getOriginHeaders(); + + /** + * Returns the {@link AsyncExecutionId} of the task + */ + AsyncExecutionId getExecutionId(); +} diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java similarity index 78% rename from x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index 4e20113d3fd..4c27ee371c4 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.search; +package org.elasticsearch.xpack.core.async; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -50,16 +51,13 @@ import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; -import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.AUTHENTICATION_KEY; /** - * A service that exposes the CRUD operations for the async-search index. + * A service that exposes the CRUD operations for the async task-specific index. */ -class AsyncSearchIndexService { - private static final Logger logger = LogManager.getLogger(AsyncSearchIndexService.class); - - public static final String INDEX = ".async-search"; +public final class AsyncTaskIndexService> { + private static final Logger logger = LogManager.getLogger(AsyncTaskIndexService.class); public static final String HEADERS_FIELD = "headers"; public static final String RESPONSE_HEADERS_FIELD = "response_headers"; @@ -103,25 +101,33 @@ class AsyncSearchIndexService { return builder; } + private final String index; private final ClusterService clusterService; private final Client client; private final SecurityContext securityContext; private final NamedWriteableRegistry registry; + private final Writeable.Reader reader; - AsyncSearchIndexService(ClusterService clusterService, - ThreadContext threadContext, - Client client, - NamedWriteableRegistry registry) { + + public AsyncTaskIndexService(String index, + ClusterService clusterService, + ThreadContext threadContext, + Client client, + String origin, + Writeable.Reader reader, + NamedWriteableRegistry registry) { + this.index = index; this.clusterService = clusterService; this.securityContext = new SecurityContext(clusterService.getSettings(), threadContext); - this.client = new OriginSettingClient(client, ASYNC_SEARCH_ORIGIN); + this.client = new OriginSettingClient(client, origin); this.registry = registry; + this.reader = reader; } /** * Returns the internal client with origin. */ - Client getClient() { + public Client getClient() { return client; } @@ -129,9 +135,9 @@ class AsyncSearchIndexService { * Creates the index with the expected settings and mappings if it doesn't exist. */ void createIndexIfNecessary(ActionListener listener) { - if (clusterService.state().routingTable().hasIndex(AsyncSearchIndexService.INDEX) == false) { + if (clusterService.state().routingTable().hasIndex(index) == false) { try { - client.admin().indices().prepareCreate(INDEX) + client.admin().indices().prepareCreate(index) .setSettings(settings()) .addMapping(SINGLE_MAPPING_NAME, mappings()) .execute(ActionListener.wrap( @@ -140,12 +146,12 @@ class AsyncSearchIndexService { if (ExceptionsHelper.unwrapCause(exc) instanceof ResourceAlreadyExistsException) { listener.onResponse(null); } else { - logger.error("failed to create async-search index", exc); + logger.error("failed to create " + index + " index", exc); listener.onFailure(exc); } })); } catch (Exception exc) { - logger.error("failed to create async-search index", exc); + logger.error("failed to create " + index + " index", exc); listener.onFailure(exc); } } else { @@ -157,15 +163,15 @@ class AsyncSearchIndexService { * Stores the initial response with the original headers of the authenticated user * and the expected expiration time. */ - void storeInitialResponse(String docId, + public void storeInitialResponse(String docId, Map headers, - AsyncSearchResponse response, + R response, ActionListener listener) throws IOException { Map source = new HashMap<>(); source.put(HEADERS_FIELD, headers); source.put(EXPIRATION_TIME_FIELD, response.getExpirationTime()); source.put(RESULT_FIELD, encodeResponse(response)); - IndexRequest indexRequest = new IndexRequest(INDEX) + IndexRequest indexRequest = new IndexRequest(index) .create(true) .id(docId) .source(source, XContentType.JSON); @@ -175,15 +181,15 @@ class AsyncSearchIndexService { /** * Stores the final response if the place-holder document is still present (update). */ - void storeFinalResponse(String docId, + public void storeFinalResponse(String docId, Map> responseHeaders, - AsyncSearchResponse response, + R response, ActionListener listener) throws IOException { Map source = new HashMap<>(); source.put(RESPONSE_HEADERS_FIELD, responseHeaders); source.put(RESULT_FIELD, encodeResponse(response)); UpdateRequest request = new UpdateRequest() - .index(INDEX) + .index(index) .id(docId) .doc(source, XContentType.JSON); client.update(request, listener); @@ -193,48 +199,48 @@ class AsyncSearchIndexService { * Updates the expiration time of the provided docId if the place-holder * document is still present (update). */ - void updateExpirationTime(String docId, + public void updateExpirationTime(String docId, long expirationTimeMillis, ActionListener listener) { Map source = Collections.singletonMap(EXPIRATION_TIME_FIELD, expirationTimeMillis); - UpdateRequest request = new UpdateRequest().index(INDEX) + UpdateRequest request = new UpdateRequest().index(index) .id(docId) .doc(source, XContentType.JSON); client.update(request, listener); } /** - * Deletes the provided searchId from the index if present. + * Deletes the provided asyncTaskId from the index if present. */ - void deleteResponse(AsyncSearchId searchId, - ActionListener listener) { - DeleteRequest request = new DeleteRequest(INDEX).id(searchId.getDocId()); + public void deleteResponse(AsyncExecutionId asyncExecutionId, + ActionListener listener) { + DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId()); client.delete(request, listener); } /** - * Returns the {@link AsyncSearchTask} if the provided searchId + * Returns the {@link AsyncTask} if the provided asyncTaskId * is registered in the task manager, null otherwise. * * This method throws a {@link ResourceNotFoundException} if the authenticated user * is not the creator of the original task. */ - AsyncSearchTask getTask(TaskManager taskManager, AsyncSearchId searchId) throws IOException { - Task task = taskManager.getTask(searchId.getTaskId().getId()); - if (task instanceof AsyncSearchTask == false) { + public T getTask(TaskManager taskManager, AsyncExecutionId asyncExecutionId, Class tClass) throws IOException { + Task task = taskManager.getTask(asyncExecutionId.getTaskId().getId()); + if (tClass.isInstance(task) == false) { return null; } - AsyncSearchTask searchTask = (AsyncSearchTask) task; - if (searchTask.getSearchId().equals(searchId) == false) { + @SuppressWarnings("unchecked") T asyncTask = (T) task; + if (asyncTask.getExecutionId().equals(asyncExecutionId) == false) { return null; } // Check authentication for the user final Authentication auth = securityContext.getAuthentication(); - if (ensureAuthenticatedUserIsSame(searchTask.getOriginHeaders(), auth) == false) { - throw new ResourceNotFoundException(searchId.getEncoded() + " not found"); + if (ensureAuthenticatedUserIsSame(asyncTask.getOriginHeaders(), auth) == false) { + throw new ResourceNotFoundException(asyncExecutionId.getEncoded() + " not found"); } - return searchTask; + return asyncTask; } /** @@ -243,25 +249,25 @@ class AsyncSearchIndexService { * When the provided restoreResponseHeaders is true, this method also restores the * response headers of the original request in the current thread context. */ - void getResponse(AsyncSearchId searchId, - boolean restoreResponseHeaders, - ActionListener listener) { + public void getResponse(AsyncExecutionId asyncExecutionId, + boolean restoreResponseHeaders, + ActionListener listener) { final Authentication current = securityContext.getAuthentication(); - GetRequest internalGet = new GetRequest(INDEX) - .preference(searchId.getEncoded()) - .id(searchId.getDocId()); + GetRequest internalGet = new GetRequest(index) + .preference(asyncExecutionId.getEncoded()) + .id(asyncExecutionId.getDocId()); client.get(internalGet, ActionListener.wrap( get -> { if (get.isExists() == false) { - listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); + listener.onFailure(new ResourceNotFoundException(asyncExecutionId.getEncoded())); return; } - // check the authentication of the current user against the user that initiated the async search + // check the authentication of the current user against the user that initiated the async task @SuppressWarnings("unchecked") Map headers = (Map) get.getSource().get(HEADERS_FIELD); if (ensureAuthenticatedUserIsSame(headers, current) == false) { - listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); + listener.onFailure(new ResourceNotFoundException(asyncExecutionId.getEncoded())); return; } @@ -273,8 +279,11 @@ class AsyncSearchIndexService { long expirationTime = (long) get.getSource().get(EXPIRATION_TIME_FIELD); String encoded = (String) get.getSource().get(RESULT_FIELD); - AsyncSearchResponse response = decodeResponse(encoded, expirationTime); - listener.onResponse(encoded != null ? response : null); + if (encoded != null) { + listener.onResponse(decodeResponse(encoded).withExpirationTime(expirationTime)); + } else { + listener.onResponse(null); + } }, listener::onFailure )); @@ -299,7 +308,7 @@ class AsyncSearchIndexService { } /** - * Compares the {@link Authentication} that was used to create the {@link AsyncSearchId} with the + * Compares the {@link Authentication} that was used to create the {@link AsyncExecutionId} with the * current authentication. */ boolean ensureAuthenticatedUserIsSame(Authentication original, Authentication current) { @@ -322,7 +331,7 @@ class AsyncSearchIndexService { /** * Encode the provided response in a binary form using base64 encoding. */ - String encodeResponse(AsyncSearchResponse response) throws IOException { + String encodeResponse(R response) throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { Version.writeVersion(Version.CURRENT, out); response.writeTo(out); @@ -333,11 +342,11 @@ class AsyncSearchIndexService { /** * Decode the provided base-64 bytes into a {@link AsyncSearchResponse}. */ - AsyncSearchResponse decodeResponse(String value, long expirationTime) throws IOException { + R decodeResponse(String value) throws IOException { try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) { try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) { in.setVersion(Version.readVersion(in)); - return new AsyncSearchResponse(in, expirationTime); + return reader.read(in); } } } @@ -345,7 +354,7 @@ class AsyncSearchIndexService { /** * Restores the provided responseHeaders to the current thread context. */ - static void restoreResponseHeadersContext(ThreadContext threadContext, Map> responseHeaders) { + public static void restoreResponseHeadersContext(ThreadContext threadContext, Map> responseHeaders) { for (Map.Entry> entry : responseHeaders.entrySet()) { for (String value : entry.getValue()) { threadContext.addResponseHeader(entry.getKey(), value); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskMaintenanceService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskMaintenanceService.java new file mode 100644 index 00000000000..c700729243c --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskMaintenanceService.java @@ -0,0 +1,129 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.async; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.EXPIRATION_TIME_FIELD; + +/** + * A service that runs a periodic cleanup over the async execution index. + *

+ * Since we will have several injected implementation of this class injected into different transports, and we bind components created + * by {@linkplain org.elasticsearch.plugins.Plugin#createComponents} to their classes, we need to implement one class per binding. + */ +public abstract class AsyncTaskMaintenanceService implements Releasable, ClusterStateListener { + private static final Logger logger = LogManager.getLogger(AsyncTaskMaintenanceService.class); + + private final String index; + private final String localNodeId; + private final ThreadPool threadPool; + private final AsyncTaskIndexService indexService; + private final TimeValue delay; + + private boolean isCleanupRunning; + private final AtomicBoolean isClosed = new AtomicBoolean(false); + private volatile Scheduler.Cancellable cancellable; + + public AsyncTaskMaintenanceService(String index, + String localNodeId, + ThreadPool threadPool, + AsyncTaskIndexService indexService, + TimeValue delay) { + this.index = index; + this.localNodeId = localNodeId; + this.threadPool = threadPool; + this.indexService = indexService; + this.delay = delay; + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + final ClusterState state = event.state(); + if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // Wait until the gateway has recovered from disk. + return; + } + tryStartCleanup(state); + } + + synchronized void tryStartCleanup(ClusterState state) { + if (isClosed.get()) { + return; + } + IndexRoutingTable indexRouting = state.routingTable().index(index); + if (indexRouting == null) { + stop(); + return; + } + String primaryNodeId = indexRouting.shard(0).primaryShard().currentNodeId(); + if (localNodeId.equals(primaryNodeId)) { + if (isCleanupRunning == false) { + isCleanupRunning = true; + executeNextCleanup(); + } + } else { + stop(); + } + } + + synchronized void executeNextCleanup() { + if (isClosed.get() == false && isCleanupRunning) { + long nowInMillis = System.currentTimeMillis(); + DeleteByQueryRequest toDelete = new DeleteByQueryRequest(index) + .setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis)); + indexService.getClient() + .execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(this::scheduleNextCleanup)); + } + } + + synchronized void scheduleNextCleanup() { + if (isClosed.get() == false && isCleanupRunning) { + try { + cancellable = threadPool.schedule(this::executeNextCleanup, delay, ThreadPool.Names.GENERIC); + } catch (EsRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + logger.debug("failed to schedule next maintenance task; shutting down", e); + } else { + throw e; + } + } + } + } + + synchronized void stop() { + if (isCleanupRunning) { + if (cancellable != null && cancellable.isCancelled() == false) { + cancellable.cancel(); + } + isCleanupRunning = false; + } + } + + @Override + public void close() { + stop(); + isClosed.compareAndSet(false, true); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java index bb988cb1fad..3ec98c9048c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.StatusToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.core.async.AsyncResponse; import java.io.IOException; @@ -22,7 +23,7 @@ import static org.elasticsearch.rest.RestStatus.OK; /** * A response of an async search request. */ -public class AsyncSearchResponse extends ActionResponse implements StatusToXContentObject { +public class AsyncSearchResponse extends ActionResponse implements StatusToXContentObject, AsyncResponse { @Nullable private final String id; @Nullable @@ -33,7 +34,7 @@ public class AsyncSearchResponse extends ActionResponse implements StatusToXCont private final boolean isPartial; private final long startTimeMillis; - private long expirationTimeMillis; + private final long expirationTimeMillis; /** * Creates an {@link AsyncSearchResponse} with meta-information only (not-modified). @@ -74,18 +75,13 @@ public class AsyncSearchResponse extends ActionResponse implements StatusToXCont } public AsyncSearchResponse(StreamInput in) throws IOException { - this(in, null); - } - - public AsyncSearchResponse(StreamInput in, Long expirationTime) throws IOException { this.id = in.readOptionalString(); this.error = in.readOptionalWriteable(ElasticsearchException::new); this.searchResponse = in.readOptionalWriteable(SearchResponse::new); this.isPartial = in.readBoolean(); this.isRunning = in.readBoolean(); this.startTimeMillis = in.readLong(); - long origExpiration = in.readLong(); - this.expirationTimeMillis = expirationTime == null ? origExpiration : expirationTime; + this.expirationTimeMillis = in.readLong(); } @Override @@ -158,12 +154,14 @@ public class AsyncSearchResponse extends ActionResponse implements StatusToXCont /** * When this response will expired as a timestamp in milliseconds since epoch. */ + @Override public long getExpirationTime() { return expirationTimeMillis; } - public void setExpirationTime(long expirationTimeMillis) { - this.expirationTimeMillis = expirationTimeMillis; + @Override + public AsyncSearchResponse withExpirationTime(long expirationTimeMillis) { + return new AsyncSearchResponse(id, searchResponse, error, isPartial, isRunning, startTimeMillis, expirationTimeMillis); } @Override diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIdTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdTests.java similarity index 60% rename from x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIdTests.java rename to x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdTests.java index ebfa4bfc971..cff463f14aa 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIdTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdTests.java @@ -4,35 +4,35 @@ * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.search; +package org.elasticsearch.xpack.core.async; import org.elasticsearch.common.UUIDs; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; -public class AsyncSearchIdTests extends ESTestCase { +public class AsyncExecutionIdTests extends ESTestCase { public void testEncode() { for (int i = 0; i < 10; i++) { - AsyncSearchId instance = new AsyncSearchId(UUIDs.randomBase64UUID(), + AsyncExecutionId instance = new AsyncExecutionId(UUIDs.randomBase64UUID(), new TaskId(randomAlphaOfLengthBetween(5, 20), randomNonNegativeLong())); - String encoded = AsyncSearchId.encode(instance.getDocId(), instance.getTaskId()); - AsyncSearchId same = AsyncSearchId.decode(encoded); + String encoded = AsyncExecutionId.encode(instance.getDocId(), instance.getTaskId()); + AsyncExecutionId same = AsyncExecutionId.decode(encoded); assertEquals(same, instance); - AsyncSearchId mutate = mutate(instance); + AsyncExecutionId mutate = mutate(instance); assertNotEquals(mutate, instance); assertNotEquals(mutate, same); } } - private AsyncSearchId mutate(AsyncSearchId id) { + private AsyncExecutionId mutate(AsyncExecutionId id) { int rand = randomIntBetween(0, 1); switch (rand) { case 0: - return new AsyncSearchId(randomAlphaOfLength(id.getDocId().length()+1), id.getTaskId()); + return new AsyncExecutionId(randomAlphaOfLength(id.getDocId().length()+1), id.getTaskId()); case 1: - return new AsyncSearchId(id.getDocId(), + return new AsyncExecutionId(id.getDocId(), new TaskId(randomAlphaOfLength(id.getTaskId().getNodeId().length()), randomNonNegativeLong())); default: diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java new file mode 100644 index 00000000000..7457acfefca --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.async; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.transport.TransportService; +import org.junit.Before; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; +import static org.hamcrest.Matchers.equalTo; + +// TODO: test CRUD operations +public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase { + private AsyncTaskIndexService indexService; + + public static class TestAsyncResponse implements AsyncResponse { + private final String test; + private final long expirationTimeMillis; + + public TestAsyncResponse(String test, long expirationTimeMillis) { + this.test = test; + this.expirationTimeMillis = expirationTimeMillis; + } + + public TestAsyncResponse(StreamInput input) throws IOException { + test = input.readOptionalString(); + this.expirationTimeMillis = input.readLong(); + } + + @Override + public long getExpirationTime() { + return 0; + } + + @Override + public TestAsyncResponse withExpirationTime(long expirationTime) { + return new TestAsyncResponse(test, expirationTime); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(test); + out.writeLong(expirationTimeMillis); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TestAsyncResponse that = (TestAsyncResponse) o; + return expirationTimeMillis == that.expirationTimeMillis && + Objects.equals(test, that.test); + } + + @Override + public int hashCode() { + return Objects.hash(test, expirationTimeMillis); + } + } + + @Before + public void setup() { + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + TransportService transportService = getInstanceFromNode(TransportService.class); + indexService = new AsyncTaskIndexService<>("test", clusterService, transportService.getThreadPool().getThreadContext(), + client(), ASYNC_SEARCH_ORIGIN, TestAsyncResponse::new, writableRegistry()); + } + + public void testEncodeSearchResponse() throws IOException { + for (int i = 0; i < 10; i++) { + TestAsyncResponse response = new TestAsyncResponse(randomAlphaOfLength(10), randomLong()); + String encoded = indexService.encodeResponse(response); + TestAsyncResponse same = indexService.decodeResponse(encoded); + assertThat(same, equalTo(response)); + } + } +} diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIndexServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java similarity index 80% rename from x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIndexServiceTests.java rename to x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java index dc33276cacd..28e9a9c806a 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIndexServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.search; +package org.elasticsearch.xpack.core.async; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; @@ -23,30 +23,19 @@ import java.io.IOException; import java.util.Collections; import java.util.concurrent.ExecutionException; -import static org.elasticsearch.xpack.search.AsyncSearchResponseTests.assertEqualResponses; -import static org.elasticsearch.xpack.search.AsyncSearchResponseTests.randomAsyncSearchResponse; -import static org.elasticsearch.xpack.search.AsyncSearchResponseTests.randomSearchResponse; -import static org.elasticsearch.xpack.search.GetAsyncSearchRequestTests.randomSearchId; - // TODO: test CRUD operations -public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase { - private AsyncSearchIndexService indexService; +public class AsyncTaskServiceTests extends ESSingleNodeTestCase { + private AsyncTaskIndexService indexService; + + public String index = ".async-search"; @Before public void setup() { ClusterService clusterService = getInstanceFromNode(ClusterService.class); TransportService transportService = getInstanceFromNode(TransportService.class); - indexService = new AsyncSearchIndexService(clusterService, transportService.getThreadPool().getThreadContext(), - client(), writableRegistry()); - } - - public void testEncodeSearchResponse() throws IOException { - for (int i = 0; i < 10; i++) { - AsyncSearchResponse response = randomAsyncSearchResponse(randomSearchId(), randomSearchResponse()); - String encoded = indexService.encodeResponse(response); - AsyncSearchResponse same = indexService.decodeResponse(encoded, response.getExpirationTime()); - assertEqualResponses(response, same); - } + indexService = new AsyncTaskIndexService<>(index, clusterService, + transportService.getThreadPool().getThreadContext(), + client(), "test_origin", AsyncSearchResponse::new, writableRegistry()); } public void testEnsuredAuthenticatedUserIsSame() throws IOException { @@ -111,8 +100,8 @@ public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase { indexService.createIndexIfNecessary(future); future.get(); GetIndexResponse getIndexResponse = client().admin().indices().getIndex( - new GetIndexRequest().indices(AsyncSearchIndexService.INDEX)).actionGet(); - Settings settings = getIndexResponse.getSettings().get(AsyncSearchIndexService.INDEX); + new GetIndexRequest().indices(index)).actionGet(); + Settings settings = getIndexResponse.getSettings().get(index); assertEquals("1", settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS)); assertEquals("0-1", settings.get(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS)); }