Make AsyncSearchIndexService reusable (#55598)

EQL will require very similar functionality to async search. This PR refactors
AsyncSearchIndexService to make it reusable for EQL.

Supersedes #55119
Relates to #49638
This commit is contained in:
Igor Motov 2020-04-23 18:02:17 -04:00 committed by GitHub
parent 96a02089c2
commit 8c7ef2417f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 449 additions and 264 deletions

View File

@ -20,6 +20,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.junit.Before;
import java.io.IOException;
@ -28,7 +29,7 @@ import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField.RUN_AS_USER_HEADER;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX;
import static org.elasticsearch.xpack.search.AsyncSearch.INDEX;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -81,7 +82,7 @@ public class AsyncSearchSecurityIT extends ESRestTestCase {
assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(404));
// other and user cannot access the result from direct get calls
AsyncSearchId searchId = AsyncSearchId.decode(id);
AsyncExecutionId searchId = AsyncExecutionId.decode(id);
for (String runAs : new String[] {user, other}) {
exc = expectThrows(ResponseException.class, () -> get(INDEX, searchId.getDocId(), runAs));
assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403));

View File

@ -29,6 +29,8 @@ import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
@ -39,9 +41,11 @@ import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
public final class AsyncSearch extends Plugin implements ActionPlugin {
public static final String INDEX = ".async-search";
private final Settings settings;
public AsyncSearch(Settings settings) {
@ -83,8 +87,9 @@ public final class AsyncSearch extends Plugin implements ActionPlugin {
Supplier<RepositoriesService> repositoriesServiceSupplier) {
if (DiscoveryNode.isDataNode(environment.settings())) {
// only data nodes should be eligible to run the maintenance service.
AsyncSearchIndexService indexService =
new AsyncSearchIndexService(clusterService, threadPool.getThreadContext(), client, namedWriteableRegistry);
AsyncTaskIndexService<AsyncSearchResponse> indexService =
new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN,
AsyncSearchResponse::new, namedWriteableRegistry);
AsyncSearchMaintenanceService maintenanceService =
new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), settings, threadPool, indexService);
clusterService.addListener(maintenanceService);

View File

@ -6,35 +6,14 @@
package org.elasticsearch.xpack.search;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.xpack.search.AsyncSearchIndexService.EXPIRATION_TIME_FIELD;
import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX;
/**
* A service that runs a periodic cleanup over the async-search index.
*/
class AsyncSearchMaintenanceService implements Releasable, ClusterStateListener {
private static final Logger logger = LogManager.getLogger(AsyncSearchMaintenanceService.class);
public class AsyncSearchMaintenanceService extends AsyncTaskMaintenanceService {
/**
* Controls the interval at which the cleanup is scheduled.
@ -45,91 +24,10 @@ class AsyncSearchMaintenanceService implements Releasable, ClusterStateListener
public static final Setting<TimeValue> ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING =
Setting.timeSetting("async_search.index_cleanup_interval", TimeValue.timeValueHours(1), Setting.Property.NodeScope);
private final String localNodeId;
private final ThreadPool threadPool;
private final AsyncSearchIndexService indexService;
private final TimeValue delay;
private boolean isCleanupRunning;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private volatile Scheduler.Cancellable cancellable;
AsyncSearchMaintenanceService(String localNodeId,
Settings nodeSettings,
ThreadPool threadPool,
AsyncSearchIndexService indexService) {
this.localNodeId = localNodeId;
this.threadPool = threadPool;
this.indexService = indexService;
this.delay = ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
final ClusterState state = event.state();
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// Wait until the gateway has recovered from disk.
return;
}
tryStartCleanup(state);
}
synchronized void tryStartCleanup(ClusterState state) {
if (isClosed.get()) {
return;
}
IndexRoutingTable indexRouting = state.routingTable().index(AsyncSearchIndexService.INDEX);
if (indexRouting == null) {
stop();
return;
}
String primaryNodeId = indexRouting.shard(0).primaryShard().currentNodeId();
if (localNodeId.equals(primaryNodeId)) {
if (isCleanupRunning == false) {
isCleanupRunning = true;
executeNextCleanup();
}
} else {
stop();
}
}
synchronized void executeNextCleanup() {
if (isClosed.get() == false && isCleanupRunning) {
long nowInMillis = System.currentTimeMillis();
DeleteByQueryRequest toDelete = new DeleteByQueryRequest(INDEX)
.setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis));
indexService.getClient()
.execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(() -> scheduleNextCleanup()));
}
}
synchronized void scheduleNextCleanup() {
if (isClosed.get() == false && isCleanupRunning) {
try {
cancellable = threadPool.schedule(this::executeNextCleanup, delay, ThreadPool.Names.GENERIC);
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.debug("failed to schedule next maintenance task; shutting down", e);
} else {
throw e;
}
}
}
}
synchronized void stop() {
if (isCleanupRunning) {
if (cancellable != null && cancellable.isCancelled() == false) {
cancellable.cancel();
}
isCleanupRunning = false;
}
}
@Override
public void close() {
stop();
isClosed.compareAndSet(false, true);
AsyncTaskIndexService<?> indexService) {
super(AsyncSearch.INDEX, localNodeId, threadPool, indexService, ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings));
}
}

View File

@ -27,6 +27,8 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTask;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import java.util.ArrayList;
@ -42,9 +44,9 @@ import java.util.function.Supplier;
/**
* Task that tracks the progress of a currently running {@link SearchRequest}.
*/
final class AsyncSearchTask extends SearchTask {
final class AsyncSearchTask extends SearchTask implements AsyncTask {
private final BooleanSupplier checkSubmitCancellation;
private final AsyncSearchId searchId;
private final AsyncExecutionId searchId;
private final Client client;
private final ThreadPool threadPool;
private final Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier;
@ -73,7 +75,7 @@ final class AsyncSearchTask extends SearchTask {
* @param checkSubmitCancellation A boolean supplier that checks if the submit task has been cancelled.
* @param originHeaders All the request context headers.
* @param taskHeaders The filtered request headers for the task.
* @param searchId The {@link AsyncSearchId} of the task.
* @param searchId The {@link AsyncExecutionId} of the task.
* @param threadPool The threadPool to schedule runnable.
* @param aggReduceContextSupplier A supplier to create final reduce contexts.
*/
@ -85,7 +87,7 @@ final class AsyncSearchTask extends SearchTask {
TimeValue keepAlive,
Map<String, String> originHeaders,
Map<String, String> taskHeaders,
AsyncSearchId searchId,
AsyncExecutionId searchId,
Client client,
ThreadPool threadPool,
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier) {
@ -104,14 +106,16 @@ final class AsyncSearchTask extends SearchTask {
/**
* Returns all of the request contexts headers
*/
Map<String, String> getOriginHeaders() {
@Override
public Map<String, String> getOriginHeaders() {
return originHeaders;
}
/**
* Returns the {@link AsyncSearchId} of the task
* Returns the {@link AsyncExecutionId} of the task
*/
AsyncSearchId getSearchId() {
@Override
public AsyncExecutionId getExecutionId() {
return searchId;
}

View File

@ -28,7 +28,7 @@ import java.util.function.Supplier;
import static java.util.Collections.singletonList;
import static org.apache.lucene.search.TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO;
import static org.elasticsearch.search.aggregations.InternalAggregations.topLevelReduce;
import static org.elasticsearch.xpack.search.AsyncSearchIndexService.restoreResponseHeadersContext;
import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.restoreResponseHeadersContext;
/**
* A mutable search response that allows to update and create partial response synchronously.
@ -156,7 +156,7 @@ class MutableSearchResponse {
} else {
resp = null;
}
return new AsyncSearchResponse(task.getSearchId().getEncoded(), resp, failure, isPartial,
return new AsyncSearchResponse(task.getExecutionId().getEncoded(), resp, failure, isPartial,
frozen == false, task.getStartTime(), expirationTime);
}

View File

@ -26,16 +26,21 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction;
import java.io.IOException;
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
public class TransportDeleteAsyncSearchAction extends HandledTransportAction<DeleteAsyncSearchAction.Request, AcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(TransportDeleteAsyncSearchAction.class);
private final ClusterService clusterService;
private final TransportService transportService;
private final AsyncSearchIndexService store;
private final AsyncTaskIndexService<AsyncSearchResponse> store;
@Inject
public TransportDeleteAsyncSearchAction(TransportService transportService,
@ -45,7 +50,8 @@ public class TransportDeleteAsyncSearchAction extends HandledTransportAction<Del
NamedWriteableRegistry registry,
Client client) {
super(DeleteAsyncSearchAction.NAME, transportService, actionFilters, DeleteAsyncSearchAction.Request::new);
this.store = new AsyncSearchIndexService(clusterService, threadPool.getThreadContext(), client, registry);
this.store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client,
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
this.clusterService = clusterService;
this.transportService = transportService;
}
@ -53,7 +59,7 @@ public class TransportDeleteAsyncSearchAction extends HandledTransportAction<Del
@Override
protected void doExecute(Task task, DeleteAsyncSearchAction.Request request, ActionListener<AcknowledgedResponse> listener) {
try {
AsyncSearchId searchId = AsyncSearchId.decode(request.getId());
AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId());
DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId());
if (clusterService.localNode().getId().equals(searchId.getTaskId().getNodeId()) || node == null) {
cancelTaskAndDeleteResult(searchId, listener);
@ -67,8 +73,8 @@ public class TransportDeleteAsyncSearchAction extends HandledTransportAction<Del
}
}
void cancelTaskAndDeleteResult(AsyncSearchId searchId, ActionListener<AcknowledgedResponse> listener) throws IOException {
AsyncSearchTask task = store.getTask(taskManager, searchId);
void cancelTaskAndDeleteResult(AsyncExecutionId searchId, ActionListener<AcknowledgedResponse> listener) throws IOException {
AsyncSearchTask task = store.getTask(taskManager, searchId, AsyncSearchTask.class);
if (task != null) {
//the task was found and gets cancelled. The response may or may not be found, but we will return 200 anyways.
task.cancelTask(() -> store.deleteResponse(searchId,

View File

@ -24,14 +24,18 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsyncSearchAction.Request, AsyncSearchResponse> {
private final Logger logger = LogManager.getLogger(TransportGetAsyncSearchAction.class);
private final ClusterService clusterService;
private final TransportService transportService;
private final AsyncSearchIndexService store;
private final AsyncTaskIndexService<AsyncSearchResponse> store;
@Inject
public TransportGetAsyncSearchAction(TransportService transportService,
@ -43,14 +47,15 @@ public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsy
super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncSearchAction.Request::new);
this.clusterService = clusterService;
this.transportService = transportService;
this.store = new AsyncSearchIndexService(clusterService, threadPool.getThreadContext(), client, registry);
this.store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client,
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
}
@Override
protected void doExecute(Task task, GetAsyncSearchAction.Request request, ActionListener<AsyncSearchResponse> listener) {
try {
long nowInMillis = System.currentTimeMillis();
AsyncSearchId searchId = AsyncSearchId.decode(request.getId());
AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId());
DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId());
if (clusterService.localNode().getId().equals(searchId.getTaskId().getNodeId()) || node == null) {
if (request.getKeepAlive().getMillis() > 0) {
@ -82,13 +87,13 @@ public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsy
}
}
private void getSearchResponseFromTask(AsyncSearchId searchId,
private void getSearchResponseFromTask(AsyncExecutionId searchId,
GetAsyncSearchAction.Request request,
long nowInMillis,
long expirationTimeMillis,
ActionListener<AsyncSearchResponse> listener) {
try {
final AsyncSearchTask task = store.getTask(taskManager, searchId);
final AsyncSearchTask task = store.getTask(taskManager, searchId, AsyncSearchTask.class);
if (task == null) {
getSearchResponseFromIndex(searchId, request, nowInMillis, listener);
return;
@ -118,7 +123,7 @@ public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsy
}
}
private void getSearchResponseFromIndex(AsyncSearchId searchId,
private void getSearchResponseFromIndex(AsyncExecutionId searchId,
GetAsyncSearchAction.Request request,
long nowInMillis,
ActionListener<AsyncSearchResponse> listener) {

View File

@ -34,6 +34,8 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
@ -42,6 +44,8 @@ import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
public class TransportSubmitAsyncSearchAction extends HandledTransportAction<SubmitAsyncSearchRequest, AsyncSearchResponse> {
private static final Logger logger = LogManager.getLogger(TransportSubmitAsyncSearchAction.class);
@ -49,7 +53,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
private final Function<SearchRequest, InternalAggregation.ReduceContext> requestToAggReduceContextBuilder;
private final TransportSearchAction searchAction;
private final ThreadContext threadContext;
private final AsyncSearchIndexService store;
private final AsyncTaskIndexService<AsyncSearchResponse> store;
@Inject
public TransportSubmitAsyncSearchAction(ClusterService clusterService,
@ -65,7 +69,8 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
this.requestToAggReduceContextBuilder = request -> searchService.aggReduceContextBuilder(request).forFinalReduction();
this.searchAction = searchAction;
this.threadContext = transportService.getThreadPool().getThreadContext();
this.store = new AsyncSearchIndexService(clusterService, threadContext, client, registry);
this.store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadContext, client,
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
}
@Override
@ -88,7 +93,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
Exception cause = new TaskCancelledException(submitTask.getReasonCancelled());
onFatalFailure(searchTask, cause, searchResponse.isRunning(), submitListener);
} else {
final String docId = searchTask.getSearchId().getDocId();
final String docId = searchTask.getExecutionId().getDocId();
// creates the fallback response if the node crashes/restarts in the middle of the request
// TODO: store intermediate results ?
AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId());
@ -140,7 +145,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
SearchRequest searchRequest = new SearchRequest(request.getSearchRequest()) {
@Override
public AsyncSearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> taskHeaders) {
AsyncSearchId searchId = new AsyncSearchId(docID, new TaskId(nodeClient.getLocalNodeId(), id));
AsyncExecutionId searchId = new AsyncExecutionId(docID, new TaskId(nodeClient.getLocalNodeId(), id));
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier =
() -> requestToAggReduceContextBuilder.apply(request.getSearchRequest());
return new AsyncSearchTask(id, type, action, parentTaskId,
@ -176,29 +181,30 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
Runnable nextAction) {
if (submitTask.isCancelled() || searchTask.isCancelled()) {
// the task was cancelled so we ensure that there is nothing stored in the response index.
store.deleteResponse(searchTask.getSearchId(), ActionListener.wrap(
store.deleteResponse(searchTask.getExecutionId(), ActionListener.wrap(
resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchTask.getSearchId()), exc);
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchTask.getExecutionId()), exc);
unregisterTaskAndMoveOn(searchTask, nextAction);
}));
return;
}
try {
store.storeFinalResponse(searchTask.getSearchId().getDocId(), threadContext.getResponseHeaders(),response,
store.storeFinalResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
Throwable cause = ExceptionsHelper.unwrapCause(exc);
if (cause instanceof DocumentMissingException == false &&
cause instanceof VersionConflictEngineException == false) {
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
searchTask.getSearchId().getEncoded()), exc);
searchTask.getExecutionId().getEncoded()), exc);
}
unregisterTaskAndMoveOn(searchTask, nextAction);
}));
} catch (Exception exc) {
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getSearchId().getEncoded()), exc);
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getExecutionId().getEncoded()),
exc);
unregisterTaskAndMoveOn(searchTask, nextAction);
}
}

View File

@ -364,7 +364,7 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
assertThat(response.getExpirationTime(), greaterThan(now));
// remove the async search index
client().admin().indices().prepareDelete(AsyncSearchIndexService.INDEX).get();
client().admin().indices().prepareDelete(AsyncSearch.INDEX).get();
Exception exc = expectThrows(Exception.class, () -> getAsyncSearch(response.getId()));
Throwable cause = exc instanceof ExecutionException ?

View File

@ -27,6 +27,7 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
@ -49,7 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX;
import static org.elasticsearch.xpack.search.AsyncSearch.INDEX;
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -83,10 +84,10 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
}
/**
* Restart the node that runs the {@link TaskId} decoded from the provided {@link AsyncSearchId}.
* Restart the node that runs the {@link TaskId} decoded from the provided {@link AsyncExecutionId}.
*/
protected void restartTaskNode(String id) throws Exception {
AsyncSearchId searchId = AsyncSearchId.decode(id);
AsyncExecutionId searchId = AsyncExecutionId.decode(id);
final ClusterStateResponse clusterState = client().admin().cluster()
.prepareState().clear().setNodes(true).get();
DiscoveryNode node = clusterState.getState().nodes().get(searchId.getTaskId().getNodeId());
@ -116,10 +117,10 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
}
/**
* Wait the removal of the document decoded from the provided {@link AsyncSearchId}.
* Wait the removal of the document decoded from the provided {@link AsyncExecutionId}.
*/
protected void ensureTaskRemoval(String id) throws Exception {
AsyncSearchId searchId = AsyncSearchId.decode(id);
AsyncExecutionId searchId = AsyncExecutionId.decode(id);
assertBusy(() -> {
GetResponse resp = client().prepareGet()
.setIndex(INDEX)
@ -143,11 +144,11 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
}
/**
* Wait the completion of the {@link TaskId} decoded from the provided {@link AsyncSearchId}.
* Wait the completion of the {@link TaskId} decoded from the provided {@link AsyncExecutionId}.
*/
protected void ensureTaskCompletion(String id) throws Exception {
assertBusy(() -> {
TaskId taskId = AsyncSearchId.decode(id).getTaskId();
TaskId taskId = AsyncExecutionId.decode(id).getTaskId();
try {
GetTaskResponse resp = client().admin().cluster()
.prepareGetTask(taskId).get();

View File

@ -21,6 +21,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.junit.After;
import org.junit.Before;
@ -66,7 +67,7 @@ public class AsyncSearchTaskTests extends ESTestCase {
public void testWaitForInit() throws InterruptedException {
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> false, TimeValue.timeValueHours(1),
Collections.emptyMap(), Collections.emptyMap(), new AsyncSearchId("0", new TaskId("node1", 1)),
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
new NoOpClient(threadPool), threadPool, null);
int numShards = randomIntBetween(0, 10);
List<SearchShard> shards = new ArrayList<>();
@ -106,7 +107,7 @@ public class AsyncSearchTaskTests extends ESTestCase {
public void testWithFailure() throws InterruptedException {
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> false, TimeValue.timeValueHours(1),
Collections.emptyMap(), Collections.emptyMap(), new AsyncSearchId("0", new TaskId("node1", 1)),
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
new NoOpClient(threadPool), threadPool, null);
int numThreads = randomIntBetween(1, 10);
CountDownLatch latch = new CountDownLatch(numThreads);
@ -134,7 +135,7 @@ public class AsyncSearchTaskTests extends ESTestCase {
public void testWaitForCompletion() throws InterruptedException {
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> false, TimeValue.timeValueHours(1),
Collections.emptyMap(), Collections.emptyMap(), new AsyncSearchId("0", new TaskId("node1", 1)),
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
new NoOpClient(threadPool), threadPool, null);
int numShards = randomIntBetween(0, 10);
List<SearchShard> shards = new ArrayList<>();

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
public class GetAsyncSearchRequestTests extends AbstractWireSerializingTestCase<GetAsyncSearchAction.Request> {
@ -31,7 +32,7 @@ public class GetAsyncSearchRequestTests extends AbstractWireSerializingTestCase<
}
static String randomSearchId() {
return AsyncSearchId.encode(UUIDs.randomBase64UUID(),
return AsyncExecutionId.encode(UUIDs.randomBase64UUID(),
new TaskId(randomAlphaOfLengthBetween(10, 20), randomLongBetween(0, Long.MAX_VALUE)));
}
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.search;
package org.elasticsearch.xpack.core.async;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
@ -17,14 +17,14 @@ import java.util.Base64;
import java.util.Objects;
/**
* A class that contains all information related to a submitted async search.
* A class that contains all information related to a submitted async execution.
*/
class AsyncSearchId {
public final class AsyncExecutionId {
private final String docId;
private final TaskId taskId;
private final String encoded;
AsyncSearchId(String docId, TaskId taskId) {
public AsyncExecutionId(String docId, TaskId taskId) {
this.docId = docId;
this.taskId = taskId;
this.encoded = encode(docId, taskId);
@ -33,21 +33,21 @@ class AsyncSearchId {
/**
* The document id of the response in the index if the task is not running.
*/
String getDocId() {
public String getDocId() {
return docId;
}
/**
* The {@link TaskId} of the async search in the task manager.
* The {@link TaskId} of the async execution in the task manager.
*/
TaskId getTaskId() {
public TaskId getTaskId() {
return taskId;
}
/**
* Gets the encoded string that represents this search.
* Gets the encoded string that represents this execution.
*/
String getEncoded() {
public String getEncoded() {
return encoded;
}
@ -55,7 +55,7 @@ class AsyncSearchId {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AsyncSearchId searchId = (AsyncSearchId) o;
AsyncExecutionId searchId = (AsyncExecutionId) o;
return docId.equals(searchId.docId) &&
taskId.equals(searchId.taskId);
}
@ -67,17 +67,17 @@ class AsyncSearchId {
@Override
public String toString() {
return "AsyncSearchId{" +
return "AsyncExecutionId{" +
"docId='" + docId + '\'' +
", taskId=" + taskId +
'}';
}
/**
* Encodes the informations needed to retrieve a async search response
* Encodes the informations needed to retrieve a async response
* in a base64 encoded string.
*/
static String encode(String docId, TaskId taskId) {
public static String encode(String docId, TaskId taskId) {
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeString(docId);
out.writeString(taskId.toString());
@ -88,13 +88,13 @@ class AsyncSearchId {
}
/**
* Decodes a base64 encoded string into an {@link AsyncSearchId} that can be used
* to retrieve the response of an async search.
* Decodes a base64 encoded string into an {@link AsyncExecutionId} that can be used
* to retrieve the response of an async execution.
*/
static AsyncSearchId decode(String id) {
final AsyncSearchId searchId;
public static AsyncExecutionId decode(String id) {
final AsyncExecutionId searchId;
try (StreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getUrlDecoder().decode(id)))) {
searchId = new AsyncSearchId(in.readString(), new TaskId(in.readString()));
searchId = new AsyncExecutionId(in.readString(), new TaskId(in.readString()));
if (in.available() > 0) {
throw new IllegalArgumentException("invalid id:[" + id + "]");
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.async;
import org.elasticsearch.common.io.stream.Writeable;
public interface AsyncResponse<T extends AsyncResponse<?>> extends Writeable {
/**
* When this response will expire as a timestamp in milliseconds since epoch.
*/
long getExpirationTime();
/**
* Returns a copy of this object with a new expiration time
*/
T withExpirationTime(long expirationTimeMillis);
}

View File

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.async;
import java.util.Map;
/**
* A task that supports asynchronous execution and provides information necessary for safe temporary storage of results
*/
public interface AsyncTask {
/**
* Returns all of the request contexts headers
*/
Map<String, String> getOriginHeaders();
/**
* Returns the {@link AsyncExecutionId} of the task
*/
AsyncExecutionId getExecutionId();
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.search;
package org.elasticsearch.xpack.core.async;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -50,16 +51,13 @@ import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.AUTHENTICATION_KEY;
/**
* A service that exposes the CRUD operations for the async-search index.
* A service that exposes the CRUD operations for the async task-specific index.
*/
class AsyncSearchIndexService {
private static final Logger logger = LogManager.getLogger(AsyncSearchIndexService.class);
public static final String INDEX = ".async-search";
public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
private static final Logger logger = LogManager.getLogger(AsyncTaskIndexService.class);
public static final String HEADERS_FIELD = "headers";
public static final String RESPONSE_HEADERS_FIELD = "response_headers";
@ -103,25 +101,33 @@ class AsyncSearchIndexService {
return builder;
}
private final String index;
private final ClusterService clusterService;
private final Client client;
private final SecurityContext securityContext;
private final NamedWriteableRegistry registry;
private final Writeable.Reader<R> reader;
AsyncSearchIndexService(ClusterService clusterService,
ThreadContext threadContext,
Client client,
NamedWriteableRegistry registry) {
public AsyncTaskIndexService(String index,
ClusterService clusterService,
ThreadContext threadContext,
Client client,
String origin,
Writeable.Reader<R> reader,
NamedWriteableRegistry registry) {
this.index = index;
this.clusterService = clusterService;
this.securityContext = new SecurityContext(clusterService.getSettings(), threadContext);
this.client = new OriginSettingClient(client, ASYNC_SEARCH_ORIGIN);
this.client = new OriginSettingClient(client, origin);
this.registry = registry;
this.reader = reader;
}
/**
* Returns the internal client with origin.
*/
Client getClient() {
public Client getClient() {
return client;
}
@ -129,9 +135,9 @@ class AsyncSearchIndexService {
* Creates the index with the expected settings and mappings if it doesn't exist.
*/
void createIndexIfNecessary(ActionListener<Void> listener) {
if (clusterService.state().routingTable().hasIndex(AsyncSearchIndexService.INDEX) == false) {
if (clusterService.state().routingTable().hasIndex(index) == false) {
try {
client.admin().indices().prepareCreate(INDEX)
client.admin().indices().prepareCreate(index)
.setSettings(settings())
.addMapping(SINGLE_MAPPING_NAME, mappings())
.execute(ActionListener.wrap(
@ -140,12 +146,12 @@ class AsyncSearchIndexService {
if (ExceptionsHelper.unwrapCause(exc) instanceof ResourceAlreadyExistsException) {
listener.onResponse(null);
} else {
logger.error("failed to create async-search index", exc);
logger.error("failed to create " + index + " index", exc);
listener.onFailure(exc);
}
}));
} catch (Exception exc) {
logger.error("failed to create async-search index", exc);
logger.error("failed to create " + index + " index", exc);
listener.onFailure(exc);
}
} else {
@ -157,15 +163,15 @@ class AsyncSearchIndexService {
* Stores the initial response with the original headers of the authenticated user
* and the expected expiration time.
*/
void storeInitialResponse(String docId,
public void storeInitialResponse(String docId,
Map<String, String> headers,
AsyncSearchResponse response,
R response,
ActionListener<IndexResponse> listener) throws IOException {
Map<String, Object> source = new HashMap<>();
source.put(HEADERS_FIELD, headers);
source.put(EXPIRATION_TIME_FIELD, response.getExpirationTime());
source.put(RESULT_FIELD, encodeResponse(response));
IndexRequest indexRequest = new IndexRequest(INDEX)
IndexRequest indexRequest = new IndexRequest(index)
.create(true)
.id(docId)
.source(source, XContentType.JSON);
@ -175,15 +181,15 @@ class AsyncSearchIndexService {
/**
* Stores the final response if the place-holder document is still present (update).
*/
void storeFinalResponse(String docId,
public void storeFinalResponse(String docId,
Map<String, List<String>> responseHeaders,
AsyncSearchResponse response,
R response,
ActionListener<UpdateResponse> listener) throws IOException {
Map<String, Object> source = new HashMap<>();
source.put(RESPONSE_HEADERS_FIELD, responseHeaders);
source.put(RESULT_FIELD, encodeResponse(response));
UpdateRequest request = new UpdateRequest()
.index(INDEX)
.index(index)
.id(docId)
.doc(source, XContentType.JSON);
client.update(request, listener);
@ -193,48 +199,48 @@ class AsyncSearchIndexService {
* Updates the expiration time of the provided <code>docId</code> if the place-holder
* document is still present (update).
*/
void updateExpirationTime(String docId,
public void updateExpirationTime(String docId,
long expirationTimeMillis,
ActionListener<UpdateResponse> listener) {
Map<String, Object> source = Collections.singletonMap(EXPIRATION_TIME_FIELD, expirationTimeMillis);
UpdateRequest request = new UpdateRequest().index(INDEX)
UpdateRequest request = new UpdateRequest().index(index)
.id(docId)
.doc(source, XContentType.JSON);
client.update(request, listener);
}
/**
* Deletes the provided <code>searchId</code> from the index if present.
* Deletes the provided <code>asyncTaskId</code> from the index if present.
*/
void deleteResponse(AsyncSearchId searchId,
ActionListener<DeleteResponse> listener) {
DeleteRequest request = new DeleteRequest(INDEX).id(searchId.getDocId());
public void deleteResponse(AsyncExecutionId asyncExecutionId,
ActionListener<DeleteResponse> listener) {
DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId());
client.delete(request, listener);
}
/**
* Returns the {@link AsyncSearchTask} if the provided <code>searchId</code>
* Returns the {@link AsyncTask} if the provided <code>asyncTaskId</code>
* is registered in the task manager, <code>null</code> otherwise.
*
* This method throws a {@link ResourceNotFoundException} if the authenticated user
* is not the creator of the original task.
*/
AsyncSearchTask getTask(TaskManager taskManager, AsyncSearchId searchId) throws IOException {
Task task = taskManager.getTask(searchId.getTaskId().getId());
if (task instanceof AsyncSearchTask == false) {
public <T extends AsyncTask> T getTask(TaskManager taskManager, AsyncExecutionId asyncExecutionId, Class<T> tClass) throws IOException {
Task task = taskManager.getTask(asyncExecutionId.getTaskId().getId());
if (tClass.isInstance(task) == false) {
return null;
}
AsyncSearchTask searchTask = (AsyncSearchTask) task;
if (searchTask.getSearchId().equals(searchId) == false) {
@SuppressWarnings("unchecked") T asyncTask = (T) task;
if (asyncTask.getExecutionId().equals(asyncExecutionId) == false) {
return null;
}
// Check authentication for the user
final Authentication auth = securityContext.getAuthentication();
if (ensureAuthenticatedUserIsSame(searchTask.getOriginHeaders(), auth) == false) {
throw new ResourceNotFoundException(searchId.getEncoded() + " not found");
if (ensureAuthenticatedUserIsSame(asyncTask.getOriginHeaders(), auth) == false) {
throw new ResourceNotFoundException(asyncExecutionId.getEncoded() + " not found");
}
return searchTask;
return asyncTask;
}
/**
@ -243,25 +249,25 @@ class AsyncSearchIndexService {
* When the provided <code>restoreResponseHeaders</code> is <code>true</code>, this method also restores the
* response headers of the original request in the current thread context.
*/
void getResponse(AsyncSearchId searchId,
boolean restoreResponseHeaders,
ActionListener<AsyncSearchResponse> listener) {
public void getResponse(AsyncExecutionId asyncExecutionId,
boolean restoreResponseHeaders,
ActionListener<R> listener) {
final Authentication current = securityContext.getAuthentication();
GetRequest internalGet = new GetRequest(INDEX)
.preference(searchId.getEncoded())
.id(searchId.getDocId());
GetRequest internalGet = new GetRequest(index)
.preference(asyncExecutionId.getEncoded())
.id(asyncExecutionId.getDocId());
client.get(internalGet, ActionListener.wrap(
get -> {
if (get.isExists() == false) {
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
listener.onFailure(new ResourceNotFoundException(asyncExecutionId.getEncoded()));
return;
}
// check the authentication of the current user against the user that initiated the async search
// check the authentication of the current user against the user that initiated the async task
@SuppressWarnings("unchecked")
Map<String, String> headers = (Map<String, String>) get.getSource().get(HEADERS_FIELD);
if (ensureAuthenticatedUserIsSame(headers, current) == false) {
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
listener.onFailure(new ResourceNotFoundException(asyncExecutionId.getEncoded()));
return;
}
@ -273,8 +279,11 @@ class AsyncSearchIndexService {
long expirationTime = (long) get.getSource().get(EXPIRATION_TIME_FIELD);
String encoded = (String) get.getSource().get(RESULT_FIELD);
AsyncSearchResponse response = decodeResponse(encoded, expirationTime);
listener.onResponse(encoded != null ? response : null);
if (encoded != null) {
listener.onResponse(decodeResponse(encoded).withExpirationTime(expirationTime));
} else {
listener.onResponse(null);
}
},
listener::onFailure
));
@ -299,7 +308,7 @@ class AsyncSearchIndexService {
}
/**
* Compares the {@link Authentication} that was used to create the {@link AsyncSearchId} with the
* Compares the {@link Authentication} that was used to create the {@link AsyncExecutionId} with the
* current authentication.
*/
boolean ensureAuthenticatedUserIsSame(Authentication original, Authentication current) {
@ -322,7 +331,7 @@ class AsyncSearchIndexService {
/**
* Encode the provided response in a binary form using base64 encoding.
*/
String encodeResponse(AsyncSearchResponse response) throws IOException {
String encodeResponse(R response) throws IOException {
try (BytesStreamOutput out = new BytesStreamOutput()) {
Version.writeVersion(Version.CURRENT, out);
response.writeTo(out);
@ -333,11 +342,11 @@ class AsyncSearchIndexService {
/**
* Decode the provided base-64 bytes into a {@link AsyncSearchResponse}.
*/
AsyncSearchResponse decodeResponse(String value, long expirationTime) throws IOException {
R decodeResponse(String value) throws IOException {
try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) {
in.setVersion(Version.readVersion(in));
return new AsyncSearchResponse(in, expirationTime);
return reader.read(in);
}
}
}
@ -345,7 +354,7 @@ class AsyncSearchIndexService {
/**
* Restores the provided <code>responseHeaders</code> to the current thread context.
*/
static void restoreResponseHeadersContext(ThreadContext threadContext, Map<String, List<String>> responseHeaders) {
public static void restoreResponseHeadersContext(ThreadContext threadContext, Map<String, List<String>> responseHeaders) {
for (Map.Entry<String, List<String>> entry : responseHeaders.entrySet()) {
for (String value : entry.getValue()) {
threadContext.addResponseHeader(entry.getKey(), value);

View File

@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.async;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.EXPIRATION_TIME_FIELD;
/**
* A service that runs a periodic cleanup over the async execution index.
* <p>
* Since we will have several injected implementation of this class injected into different transports, and we bind components created
* by {@linkplain org.elasticsearch.plugins.Plugin#createComponents} to their classes, we need to implement one class per binding.
*/
public abstract class AsyncTaskMaintenanceService implements Releasable, ClusterStateListener {
private static final Logger logger = LogManager.getLogger(AsyncTaskMaintenanceService.class);
private final String index;
private final String localNodeId;
private final ThreadPool threadPool;
private final AsyncTaskIndexService<?> indexService;
private final TimeValue delay;
private boolean isCleanupRunning;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private volatile Scheduler.Cancellable cancellable;
public AsyncTaskMaintenanceService(String index,
String localNodeId,
ThreadPool threadPool,
AsyncTaskIndexService<?> indexService,
TimeValue delay) {
this.index = index;
this.localNodeId = localNodeId;
this.threadPool = threadPool;
this.indexService = indexService;
this.delay = delay;
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
final ClusterState state = event.state();
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// Wait until the gateway has recovered from disk.
return;
}
tryStartCleanup(state);
}
synchronized void tryStartCleanup(ClusterState state) {
if (isClosed.get()) {
return;
}
IndexRoutingTable indexRouting = state.routingTable().index(index);
if (indexRouting == null) {
stop();
return;
}
String primaryNodeId = indexRouting.shard(0).primaryShard().currentNodeId();
if (localNodeId.equals(primaryNodeId)) {
if (isCleanupRunning == false) {
isCleanupRunning = true;
executeNextCleanup();
}
} else {
stop();
}
}
synchronized void executeNextCleanup() {
if (isClosed.get() == false && isCleanupRunning) {
long nowInMillis = System.currentTimeMillis();
DeleteByQueryRequest toDelete = new DeleteByQueryRequest(index)
.setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis));
indexService.getClient()
.execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(this::scheduleNextCleanup));
}
}
synchronized void scheduleNextCleanup() {
if (isClosed.get() == false && isCleanupRunning) {
try {
cancellable = threadPool.schedule(this::executeNextCleanup, delay, ThreadPool.Names.GENERIC);
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.debug("failed to schedule next maintenance task; shutting down", e);
} else {
throw e;
}
}
}
}
synchronized void stop() {
if (isCleanupRunning) {
if (cancellable != null && cancellable.isCancelled() == false) {
cancellable.cancel();
}
isCleanupRunning = false;
}
}
@Override
public void close() {
stop();
isClosed.compareAndSet(false, true);
}
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.async.AsyncResponse;
import java.io.IOException;
@ -22,7 +23,7 @@ import static org.elasticsearch.rest.RestStatus.OK;
/**
* A response of an async search request.
*/
public class AsyncSearchResponse extends ActionResponse implements StatusToXContentObject {
public class AsyncSearchResponse extends ActionResponse implements StatusToXContentObject, AsyncResponse<AsyncSearchResponse> {
@Nullable
private final String id;
@Nullable
@ -33,7 +34,7 @@ public class AsyncSearchResponse extends ActionResponse implements StatusToXCont
private final boolean isPartial;
private final long startTimeMillis;
private long expirationTimeMillis;
private final long expirationTimeMillis;
/**
* Creates an {@link AsyncSearchResponse} with meta-information only (not-modified).
@ -74,18 +75,13 @@ public class AsyncSearchResponse extends ActionResponse implements StatusToXCont
}
public AsyncSearchResponse(StreamInput in) throws IOException {
this(in, null);
}
public AsyncSearchResponse(StreamInput in, Long expirationTime) throws IOException {
this.id = in.readOptionalString();
this.error = in.readOptionalWriteable(ElasticsearchException::new);
this.searchResponse = in.readOptionalWriteable(SearchResponse::new);
this.isPartial = in.readBoolean();
this.isRunning = in.readBoolean();
this.startTimeMillis = in.readLong();
long origExpiration = in.readLong();
this.expirationTimeMillis = expirationTime == null ? origExpiration : expirationTime;
this.expirationTimeMillis = in.readLong();
}
@Override
@ -158,12 +154,14 @@ public class AsyncSearchResponse extends ActionResponse implements StatusToXCont
/**
* When this response will expired as a timestamp in milliseconds since epoch.
*/
@Override
public long getExpirationTime() {
return expirationTimeMillis;
}
public void setExpirationTime(long expirationTimeMillis) {
this.expirationTimeMillis = expirationTimeMillis;
@Override
public AsyncSearchResponse withExpirationTime(long expirationTimeMillis) {
return new AsyncSearchResponse(id, searchResponse, error, isPartial, isRunning, startTimeMillis, expirationTimeMillis);
}
@Override

View File

@ -4,35 +4,35 @@
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.search;
package org.elasticsearch.xpack.core.async;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
public class AsyncSearchIdTests extends ESTestCase {
public class AsyncExecutionIdTests extends ESTestCase {
public void testEncode() {
for (int i = 0; i < 10; i++) {
AsyncSearchId instance = new AsyncSearchId(UUIDs.randomBase64UUID(),
AsyncExecutionId instance = new AsyncExecutionId(UUIDs.randomBase64UUID(),
new TaskId(randomAlphaOfLengthBetween(5, 20), randomNonNegativeLong()));
String encoded = AsyncSearchId.encode(instance.getDocId(), instance.getTaskId());
AsyncSearchId same = AsyncSearchId.decode(encoded);
String encoded = AsyncExecutionId.encode(instance.getDocId(), instance.getTaskId());
AsyncExecutionId same = AsyncExecutionId.decode(encoded);
assertEquals(same, instance);
AsyncSearchId mutate = mutate(instance);
AsyncExecutionId mutate = mutate(instance);
assertNotEquals(mutate, instance);
assertNotEquals(mutate, same);
}
}
private AsyncSearchId mutate(AsyncSearchId id) {
private AsyncExecutionId mutate(AsyncExecutionId id) {
int rand = randomIntBetween(0, 1);
switch (rand) {
case 0:
return new AsyncSearchId(randomAlphaOfLength(id.getDocId().length()+1), id.getTaskId());
return new AsyncExecutionId(randomAlphaOfLength(id.getDocId().length()+1), id.getTaskId());
case 1:
return new AsyncSearchId(id.getDocId(),
return new AsyncExecutionId(id.getDocId(),
new TaskId(randomAlphaOfLength(id.getTaskId().getNodeId().length()), randomNonNegativeLong()));
default:

View File

@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.async;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
import static org.hamcrest.Matchers.equalTo;
// TODO: test CRUD operations
public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase {
private AsyncTaskIndexService<TestAsyncResponse> indexService;
public static class TestAsyncResponse implements AsyncResponse<TestAsyncResponse> {
private final String test;
private final long expirationTimeMillis;
public TestAsyncResponse(String test, long expirationTimeMillis) {
this.test = test;
this.expirationTimeMillis = expirationTimeMillis;
}
public TestAsyncResponse(StreamInput input) throws IOException {
test = input.readOptionalString();
this.expirationTimeMillis = input.readLong();
}
@Override
public long getExpirationTime() {
return 0;
}
@Override
public TestAsyncResponse withExpirationTime(long expirationTime) {
return new TestAsyncResponse(test, expirationTime);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(test);
out.writeLong(expirationTimeMillis);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestAsyncResponse that = (TestAsyncResponse) o;
return expirationTimeMillis == that.expirationTimeMillis &&
Objects.equals(test, that.test);
}
@Override
public int hashCode() {
return Objects.hash(test, expirationTimeMillis);
}
}
@Before
public void setup() {
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
TransportService transportService = getInstanceFromNode(TransportService.class);
indexService = new AsyncTaskIndexService<>("test", clusterService, transportService.getThreadPool().getThreadContext(),
client(), ASYNC_SEARCH_ORIGIN, TestAsyncResponse::new, writableRegistry());
}
public void testEncodeSearchResponse() throws IOException {
for (int i = 0; i < 10; i++) {
TestAsyncResponse response = new TestAsyncResponse(randomAlphaOfLength(10), randomLong());
String encoded = indexService.encodeResponse(response);
TestAsyncResponse same = indexService.decodeResponse(encoded);
assertThat(same, equalTo(response));
}
}
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.search;
package org.elasticsearch.xpack.core.async;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
@ -23,30 +23,19 @@ import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.xpack.search.AsyncSearchResponseTests.assertEqualResponses;
import static org.elasticsearch.xpack.search.AsyncSearchResponseTests.randomAsyncSearchResponse;
import static org.elasticsearch.xpack.search.AsyncSearchResponseTests.randomSearchResponse;
import static org.elasticsearch.xpack.search.GetAsyncSearchRequestTests.randomSearchId;
// TODO: test CRUD operations
public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase {
private AsyncSearchIndexService indexService;
public class AsyncTaskServiceTests extends ESSingleNodeTestCase {
private AsyncTaskIndexService<AsyncSearchResponse> indexService;
public String index = ".async-search";
@Before
public void setup() {
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
TransportService transportService = getInstanceFromNode(TransportService.class);
indexService = new AsyncSearchIndexService(clusterService, transportService.getThreadPool().getThreadContext(),
client(), writableRegistry());
}
public void testEncodeSearchResponse() throws IOException {
for (int i = 0; i < 10; i++) {
AsyncSearchResponse response = randomAsyncSearchResponse(randomSearchId(), randomSearchResponse());
String encoded = indexService.encodeResponse(response);
AsyncSearchResponse same = indexService.decodeResponse(encoded, response.getExpirationTime());
assertEqualResponses(response, same);
}
indexService = new AsyncTaskIndexService<>(index, clusterService,
transportService.getThreadPool().getThreadContext(),
client(), "test_origin", AsyncSearchResponse::new, writableRegistry());
}
public void testEnsuredAuthenticatedUserIsSame() throws IOException {
@ -111,8 +100,8 @@ public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase {
indexService.createIndexIfNecessary(future);
future.get();
GetIndexResponse getIndexResponse = client().admin().indices().getIndex(
new GetIndexRequest().indices(AsyncSearchIndexService.INDEX)).actionGet();
Settings settings = getIndexResponse.getSettings().get(AsyncSearchIndexService.INDEX);
new GetIndexRequest().indices(index)).actionGet();
Settings settings = getIndexResponse.getSettings().get(index);
assertEquals("1", settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS));
assertEquals("0-1", settings.get(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS));
}