Removed ClientHelper dependency from PersistentTasksService.

This commit is contained in:
Martijn van Groningen 2018-01-25 14:59:49 +01:00
parent cc16f9d9c9
commit 07e727c769
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
1 changed files with 44 additions and 3 deletions

View File

@ -18,9 +18,14 @@
*/ */
package org.elasticsearch.persistent; package org.elasticsearch.persistent;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateObserver;
@ -30,16 +35,16 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.util.function.BiConsumer;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.elasticsearch.ClientHelper.PERSISTENT_TASK_ORIGIN;
import static org.elasticsearch.ClientHelper.executeAsyncWithOrigin;
/** /**
* This service is used by persistent actions to propagate changes in the action state and notify about completion * This service is used by persistent actions to propagate changes in the action state and notify about completion
@ -194,4 +199,40 @@ public class PersistentTasksService extends AbstractComponent {
onFailure(new IllegalStateException("timed out after " + timeout)); onFailure(new IllegalStateException("timed out after " + timeout));
} }
} }
private static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
private static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks";
/**
* Executes a consumer after setting the origin and wrapping the listener so that the proper context is restored
*/
public static <Request extends ActionRequest, Response extends ActionResponse> void executeAsyncWithOrigin(
ThreadContext threadContext, String origin, Request request, ActionListener<Response> listener,
BiConsumer<Request, ActionListener<Response>> consumer) {
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, origin)) {
consumer.accept(request, new ContextPreservingActionListener<>(supplier, listener));
}
}
/**
* Executes an asynchronous action using the provided client. The origin is set in the context and the listener
* is wrapped to ensure the proper context is restored
*/
public static <Request extends ActionRequest, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void executeAsyncWithOrigin(
Client client, String origin, Action<Request, Response, RequestBuilder> action, Request request,
ActionListener<Response> listener) {
final ThreadContext threadContext = client.threadPool().getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, origin)) {
client.execute(action, request, new ContextPreservingActionListener<>(supplier, listener));
}
}
public static ThreadContext.StoredContext stashWithOrigin(ThreadContext threadContext, String origin) {
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
threadContext.putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
return storedContext;
}
} }