Persistent Tasks: refactor PersistentTasksService to use ActionListener (elastic/x-pack-elasticsearch#937)
PersistentTasksService methods are not using ActionListener<PersistentTask<?>> instead of PersistentTaskOperationListener. Original commit: elastic/x-pack-elasticsearch@f95d8bda3d
This commit is contained in:
parent
bac8f010b4
commit
d11fbfa70c
|
@ -131,7 +131,6 @@ import org.elasticsearch.xpack.persistent.PersistentTasksExecutorRegistry;
|
|||
import org.elasticsearch.xpack.persistent.PersistentTasksNodeService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
|
||||
import org.elasticsearch.xpack.persistent.StartPersistentTaskAction;
|
||||
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
|
@ -305,9 +304,9 @@ public class MachineLearning implements ActionPlugin {
|
|||
new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedManager, autodetectProcessManager);
|
||||
|
||||
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Arrays.asList(
|
||||
new OpenJobAction.OpenJobPersistentTasksExecutor(settings, threadPool, licenseState, persistentTasksService, clusterService,
|
||||
new OpenJobAction.OpenJobPersistentTasksExecutor(settings, threadPool, licenseState, clusterService,
|
||||
autodetectProcessManager, auditor),
|
||||
new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, threadPool, licenseState, persistentTasksService,
|
||||
new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, threadPool, licenseState,
|
||||
datafeedManager, auditor)
|
||||
));
|
||||
|
||||
|
@ -423,7 +422,6 @@ public class MachineLearning implements ActionPlugin {
|
|||
new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class),
|
||||
new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class),
|
||||
new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class),
|
||||
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
|
||||
new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class),
|
||||
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
|
||||
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class),
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.elasticsearch.xpack.ml.notifications.Auditor;
|
|||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
|
@ -276,9 +275,9 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
|
|||
|
||||
private void forceCloseJob(long persistentTaskId, String jobId, ActionListener<Response> listener) {
|
||||
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
|
||||
persistentTasksService.removeTask(persistentTaskId, new PersistentTaskOperationListener() {
|
||||
persistentTasksService.cancelPersistentTask(persistentTaskId, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
public void onResponse(PersistentTask<?> task) {
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
|
||||
|
@ -301,9 +300,9 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
|
|||
// so wait for that to happen here.
|
||||
void waitForJobClosed(long persistentTaskId, Request request, Response response, ActionListener<Response> listener) {
|
||||
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.timeout,
|
||||
new WaitForPersistentTaskStatusListener() {
|
||||
new WaitForPersistentTaskStatusListener<OpenJobAction.Request>() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
public void onResponse(PersistentTask<OpenJobAction.Request> task) {
|
||||
logger.debug("finalizing job [{}]", request.getJobId());
|
||||
FinalizeJobExecutionAction.Request finalizeRequest =
|
||||
new FinalizeJobExecutionAction.Request(request.getJobId());
|
||||
|
|
|
@ -56,7 +56,6 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignme
|
|||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -307,10 +306,10 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
@Override
|
||||
protected void doExecute(Request request, ActionListener<Response> listener) {
|
||||
if (licenseState.isMachineLearningAllowed()) {
|
||||
PersistentTaskOperationListener finalListener = new PersistentTaskOperationListener() {
|
||||
ActionListener<PersistentTask<Request>> finalListener = new ActionListener<PersistentTask<Request>>() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
waitForJobStarted(taskId, request, listener);
|
||||
public void onResponse(PersistentTask<Request> task) {
|
||||
waitForJobStarted(task.getId(), request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -318,7 +317,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
listener.onFailure(e);
|
||||
}
|
||||
};
|
||||
persistentTasksService.createPersistentActionTask(NAME, request, finalListener);
|
||||
persistentTasksService.startPersistentTask(NAME, request, finalListener);
|
||||
} else {
|
||||
listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING));
|
||||
}
|
||||
|
@ -327,9 +326,9 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
void waitForJobStarted(long taskId, Request request, ActionListener<Response> listener) {
|
||||
JobPredicate predicate = new JobPredicate();
|
||||
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, request.timeout,
|
||||
new WaitForPersistentTaskStatusListener() {
|
||||
new WaitForPersistentTaskStatusListener<Request>() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
public void onResponse(PersistentTask<Request> persistentTask) {
|
||||
listener.onResponse(new Response(predicate.opened));
|
||||
}
|
||||
|
||||
|
@ -377,9 +376,9 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
private volatile int maxConcurrentJobAllocations;
|
||||
|
||||
public OpenJobPersistentTasksExecutor(Settings settings, ThreadPool threadPool, XPackLicenseState licenseState,
|
||||
PersistentTasksService persistentTasksService, ClusterService clusterService,
|
||||
AutodetectProcessManager autodetectProcessManager, Auditor auditor) {
|
||||
super(settings, NAME, persistentTasksService, ThreadPool.Names.MANAGEMENT);
|
||||
ClusterService clusterService, AutodetectProcessManager autodetectProcessManager,
|
||||
Auditor auditor) {
|
||||
super(settings, NAME, ThreadPool.Names.MANAGEMENT);
|
||||
this.licenseState = licenseState;
|
||||
this.autodetectProcessManager = autodetectProcessManager;
|
||||
this.auditor = auditor;
|
||||
|
|
|
@ -61,7 +61,6 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignme
|
|||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
|
@ -362,10 +361,10 @@ public class StartDatafeedAction
|
|||
@Override
|
||||
protected void doExecute(Request request, ActionListener<Response> listener) {
|
||||
if (licenseState.isMachineLearningAllowed()) {
|
||||
PersistentTaskOperationListener finalListener = new PersistentTaskOperationListener() {
|
||||
ActionListener<PersistentTask<Request>> finalListener = new ActionListener<PersistentTask<Request>>() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
waitForYellow(taskId, request, listener);
|
||||
public void onResponse(PersistentTask<Request> persistentTask) {
|
||||
waitForYellow(persistentTask.getId(), request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -373,7 +372,7 @@ public class StartDatafeedAction
|
|||
listener.onFailure(e);
|
||||
}
|
||||
};
|
||||
persistentTasksService.createPersistentActionTask(NAME, request, finalListener);
|
||||
persistentTasksService.startPersistentTask(NAME, request, finalListener);
|
||||
} else {
|
||||
listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING));
|
||||
}
|
||||
|
@ -404,9 +403,9 @@ public class StartDatafeedAction
|
|||
return datafeedState == DatafeedState.STARTED;
|
||||
};
|
||||
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, request.timeout,
|
||||
new WaitForPersistentTaskStatusListener() {
|
||||
new WaitForPersistentTaskStatusListener<Request>() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
public void onResponse(PersistentTask<Request> task) {
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
|
||||
|
@ -425,9 +424,8 @@ public class StartDatafeedAction
|
|||
private final ThreadPool threadPool;
|
||||
|
||||
public StartDatafeedPersistentTasksExecutor(Settings settings, ThreadPool threadPool, XPackLicenseState licenseState,
|
||||
PersistentTasksService persistentTasksService, DatafeedManager datafeedManager,
|
||||
Auditor auditor) {
|
||||
super(settings, NAME, persistentTasksService, ThreadPool.Names.MANAGEMENT);
|
||||
DatafeedManager datafeedManager, Auditor auditor) {
|
||||
super(settings, NAME, ThreadPool.Names.MANAGEMENT);
|
||||
this.licenseState = licenseState;
|
||||
this.datafeedManager = datafeedManager;
|
||||
this.auditor = auditor;
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
|||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -256,9 +255,9 @@ public class StopDatafeedAction
|
|||
// so wait for that to happen here.
|
||||
void waitForDatafeedStopped(long persistentTaskId, Request request, Response response, ActionListener<Response> listener) {
|
||||
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.getTimeout(),
|
||||
new WaitForPersistentTaskStatusListener() {
|
||||
new WaitForPersistentTaskStatusListener<StartDatafeedAction.Request>() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
public void onResponse(PersistentTask<StartDatafeedAction.Request> task) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
|
@ -270,9 +269,9 @@ public class StopDatafeedAction
|
|||
}
|
||||
|
||||
private void forceStopTask(long persistentTaskId, ActionListener<Response> listener) {
|
||||
persistentTasksService.removeTask(persistentTaskId, new PersistentTaskOperationListener() {
|
||||
persistentTasksService.cancelPersistentTask(persistentTaskId, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
|
||||
|
|
|
@ -32,8 +32,9 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
|
|||
import org.elasticsearch.xpack.ml.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.ml.job.results.Result;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
|
@ -94,9 +95,9 @@ public class DatafeedManager extends AbstractComponent {
|
|||
}
|
||||
Holder holder = createJobDatafeed(datafeed, job, latestFinalBucketEndMs, latestRecordTimeMs, handler, task);
|
||||
runningDatafeeds.put(datafeedId, holder);
|
||||
task.updatePersistentStatus(DatafeedState.STARTED, new PersistentTaskOperationListener() {
|
||||
task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
innerRun(holder, task.getDatafeedStartTime(), task.getEndTime());
|
||||
}
|
||||
|
||||
|
@ -359,9 +360,9 @@ public class DatafeedManager extends AbstractComponent {
|
|||
|
||||
private void closeJob() {
|
||||
persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(20),
|
||||
new WaitForPersistentTaskStatusListener() {
|
||||
new WaitForPersistentTaskStatusListener<StartDatafeedAction.Request>() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
public void onResponse(PersistentTask<StartDatafeedAction.Request> PersistentTask) {
|
||||
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(datafeed.getJobId());
|
||||
client.execute(CloseJobAction.INSTANCE, closeJobRequest, new ActionListener<CloseJobAction.Response>() {
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
|
|||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
@ -43,7 +44,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
|
|||
import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater;
|
||||
import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -53,7 +54,6 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.AbstractExecutorService;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -349,9 +349,9 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
}
|
||||
|
||||
private void setJobState(JobTask jobTask, JobState state) {
|
||||
jobTask.updatePersistentStatus(state, new PersistentTaskOperationListener() {
|
||||
jobTask.updatePersistentStatus(state, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId());
|
||||
}
|
||||
|
||||
|
@ -363,9 +363,9 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
}
|
||||
|
||||
public void setJobState(JobTask jobTask, JobState state, CheckedConsumer<Exception, IOException> handler) {
|
||||
jobTask.updatePersistentStatus(state, new PersistentTaskOperationListener() {
|
||||
jobTask.updatePersistentStatus(state, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
try {
|
||||
handler.accept(null);
|
||||
} catch (IOException e1) {
|
||||
|
|
|
@ -57,7 +57,7 @@ public class AllocatedPersistentTask extends CancellableTask {
|
|||
*
|
||||
* This doesn't affect the status of this allocated task.
|
||||
*/
|
||||
public void updatePersistentStatus(Task.Status status, PersistentTasksService.PersistentTaskOperationListener listener) {
|
||||
public void updatePersistentStatus(Task.Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener);
|
||||
}
|
||||
|
||||
|
|
|
@ -9,7 +9,6 @@ import org.elasticsearch.action.Action;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
|
||||
import org.elasticsearch.action.support.master.MasterNodeRequest;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
|
@ -24,8 +23,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
@ -35,7 +34,7 @@ import java.util.Objects;
|
|||
* removed from the cluster state in case of successful completion or restarted on some other node in case of failure.
|
||||
*/
|
||||
public class CompletionPersistentTaskAction extends Action<CompletionPersistentTaskAction.Request,
|
||||
CompletionPersistentTaskAction.Response,
|
||||
PersistentTaskResponse,
|
||||
CompletionPersistentTaskAction.RequestBuilder> {
|
||||
|
||||
public static final CompletionPersistentTaskAction INSTANCE = new CompletionPersistentTaskAction();
|
||||
|
@ -51,8 +50,8 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
|
|||
}
|
||||
|
||||
@Override
|
||||
public Response newResponse() {
|
||||
return new Response();
|
||||
public PersistentTaskResponse newResponse() {
|
||||
return new PersistentTaskResponse();
|
||||
}
|
||||
|
||||
public static class Request extends MasterNodeRequest<Request> {
|
||||
|
@ -104,49 +103,15 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
|
|||
}
|
||||
}
|
||||
|
||||
public static class Response extends AcknowledgedResponse {
|
||||
public Response() {
|
||||
super();
|
||||
}
|
||||
|
||||
public Response(boolean acknowledged) {
|
||||
super(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
readAcknowledged(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
writeAcknowledged(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
AcknowledgedResponse that = (AcknowledgedResponse) o;
|
||||
return isAcknowledged() == that.isAcknowledged();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(isAcknowledged());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<CompletionPersistentTaskAction.Request,
|
||||
CompletionPersistentTaskAction.Response, CompletionPersistentTaskAction.RequestBuilder> {
|
||||
PersistentTaskResponse, CompletionPersistentTaskAction.RequestBuilder> {
|
||||
|
||||
protected RequestBuilder(ElasticsearchClient client, CompletionPersistentTaskAction action) {
|
||||
super(client, action, new Request());
|
||||
}
|
||||
}
|
||||
|
||||
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
|
||||
public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
|
||||
|
||||
private final PersistentTasksClusterService persistentTasksClusterService;
|
||||
|
||||
|
@ -166,8 +131,8 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse() {
|
||||
return new Response();
|
||||
protected PersistentTaskResponse newResponse() {
|
||||
return new PersistentTaskResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -177,18 +142,19 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
|
|||
}
|
||||
|
||||
@Override
|
||||
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) {
|
||||
persistentTasksClusterService.completePersistentTask(request.taskId, request.exception, new ActionListener<Empty>() {
|
||||
@Override
|
||||
public void onResponse(Empty empty) {
|
||||
listener.onResponse(newResponse());
|
||||
}
|
||||
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<PersistentTaskResponse> listener) {
|
||||
persistentTasksClusterService.completePersistentTask(request.taskId, request.exception,
|
||||
new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<?> task) {
|
||||
listener.onResponse(new PersistentTaskResponse(task));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
@ -185,10 +186,11 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
|
|||
protected final void masterOperation(final Request request, ClusterState state,
|
||||
final ActionListener<PersistentTaskResponse> listener) {
|
||||
persistentTasksClusterService.createPersistentTask(request.action, request.request,
|
||||
new ActionListener<Long>() {
|
||||
new ActionListener<PersistentTask<?>>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(Long newTaskId) {
|
||||
listener.onResponse(new PersistentTaskResponse(newTaskId));
|
||||
public void onResponse(PersistentTask<?> task) {
|
||||
listener.onResponse(new PersistentTaskResponse(task));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.persistent;
|
|||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
@ -16,30 +17,30 @@ import java.util.Objects;
|
|||
* Response upon a successful start or an persistent task
|
||||
*/
|
||||
public class PersistentTaskResponse extends ActionResponse {
|
||||
private long taskId;
|
||||
private PersistentTask<?> task;
|
||||
|
||||
public PersistentTaskResponse() {
|
||||
super();
|
||||
}
|
||||
|
||||
public PersistentTaskResponse(long taskId) {
|
||||
this.taskId = taskId;
|
||||
public PersistentTaskResponse(PersistentTask<?> task) {
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
taskId = in.readLong();
|
||||
task = in.readOptionalWriteable(PersistentTask::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeLong(taskId);
|
||||
out.writeOptionalWriteable(task);
|
||||
}
|
||||
|
||||
public long getTaskId() {
|
||||
return taskId;
|
||||
public PersistentTask<?> getTask() {
|
||||
return task;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -47,11 +48,11 @@ public class PersistentTaskResponse extends ActionResponse {
|
|||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
PersistentTaskResponse that = (PersistentTaskResponse) o;
|
||||
return taskId == that.taskId;
|
||||
return Objects.equals(task, that.task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(taskId);
|
||||
return Objects.hash(task);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
|
|||
* @param listener the listener that will be called when task is started
|
||||
*/
|
||||
public <Request extends PersistentTaskRequest> void createPersistentTask(String action, Request request,
|
||||
ActionListener<Long> listener) {
|
||||
ActionListener<PersistentTask<?>> listener) {
|
||||
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
|
@ -63,10 +63,15 @@ public class PersistentTasksClusterService extends AbstractComponent implements
|
|||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
listener.onResponse(
|
||||
((PersistentTasksCustomMetaData) newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)).getCurrentId());
|
||||
PersistentTasksCustomMetaData tasks = newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
if (tasks != null) {
|
||||
listener.onResponse(tasks.getTask(tasks.getCurrentId()));
|
||||
} else {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -79,7 +84,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
|
|||
* @param failure the reason for restarting the task or null if the task completed successfully
|
||||
* @param listener the listener that will be called when task is removed
|
||||
*/
|
||||
public void completePersistentTask(long id, Exception failure, ActionListener<Empty> listener) {
|
||||
public void completePersistentTask(long id, Exception failure, ActionListener<PersistentTask<?>> listener) {
|
||||
final String source;
|
||||
if (failure != null) {
|
||||
logger.warn("persistent task " + id + " failed", failure);
|
||||
|
@ -108,38 +113,8 @@ public class PersistentTasksClusterService extends AbstractComponent implements
|
|||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
listener.onResponse(Empty.INSTANCE);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Switches the persistent task from stopped to started mode
|
||||
*
|
||||
* @param id the id of a persistent task
|
||||
* @param listener the listener that will be called when task is removed
|
||||
*/
|
||||
public void startPersistentTask(long id, ActionListener<Empty> listener) {
|
||||
clusterService.submitStateUpdateTask("start persistent task", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
|
||||
if (tasksInProgress.hasTask(id)) {
|
||||
return update(currentState, tasksInProgress
|
||||
.assignTask(id, (action, request) -> getAssignement(action, currentState, request)));
|
||||
} else {
|
||||
throw new ResourceNotFoundException("the task with id {} doesn't exist", id);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
listener.onResponse(Empty.INSTANCE);
|
||||
// Using old state since in the new state the task is already gone
|
||||
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(oldState, id));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -150,7 +125,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
|
|||
* @param id the id of a persistent task
|
||||
* @param listener the listener that will be called when task is removed
|
||||
*/
|
||||
public void removePersistentTask(long id, ActionListener<Empty> listener) {
|
||||
public void removePersistentTask(long id, ActionListener<PersistentTask<?>> listener) {
|
||||
clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
|
@ -169,7 +144,8 @@ public class PersistentTasksClusterService extends AbstractComponent implements
|
|||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
listener.onResponse(Empty.INSTANCE);
|
||||
// Using old state since in the new state the task is already gone
|
||||
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(oldState, id));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -182,7 +158,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
|
|||
* @param status new status
|
||||
* @param listener the listener that will be called when task is removed
|
||||
*/
|
||||
public void updatePersistentTaskStatus(long id, long allocationId, Task.Status status, ActionListener<Empty> listener) {
|
||||
public void updatePersistentTaskStatus(long id, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
|
||||
clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
|
@ -206,7 +182,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
|
|||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
listener.onResponse(Empty.INSTANCE);
|
||||
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, id));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -268,7 +268,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private PersistentTask(StreamInput in) throws IOException {
|
||||
public PersistentTask(StreamInput in) throws IOException {
|
||||
id = in.readLong();
|
||||
allocationId = in.readLong();
|
||||
taskName = in.readString();
|
||||
|
|
|
@ -10,9 +10,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
|
||||
import java.util.function.Predicate;
|
||||
|
@ -25,14 +23,11 @@ public abstract class PersistentTasksExecutor<Request extends PersistentTaskRequ
|
|||
|
||||
private final String executor;
|
||||
private final String taskName;
|
||||
private final PersistentTasksService persistentTasksService;
|
||||
|
||||
protected PersistentTasksExecutor(Settings settings, String taskName, PersistentTasksService persistentTasksService,
|
||||
String executor) {
|
||||
protected PersistentTasksExecutor(Settings settings, String taskName, String executor) {
|
||||
super(settings);
|
||||
this.taskName = taskName;
|
||||
this.executor = executor;
|
||||
this.persistentTasksService = persistentTasksService;
|
||||
}
|
||||
|
||||
public String getTaskName() {
|
||||
|
|
|
@ -9,6 +9,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -23,7 +24,6 @@ import org.elasticsearch.tasks.TaskCancelledException;
|
|||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -147,10 +147,10 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
AllocatedPersistentTask task = runningTasks.remove(persistentTaskId);
|
||||
if (task != null) {
|
||||
if (task.markAsCancelled()) {
|
||||
persistentTasksService.sendCancellation(task.getId(), new PersistentTaskOperationListener() {
|
||||
persistentTasksService.sendCancellation(task.getId(), new ActionListener<CancelTasksResponse>() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
logger.trace("Persistent task with id {} was cancelled", taskId);
|
||||
public void onResponse(CancelTasksResponse cancelTasksResponse) {
|
||||
logger.trace("Persistent task with id {} was cancelled", task.getId());
|
||||
|
||||
}
|
||||
|
||||
|
@ -240,7 +240,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
}
|
||||
}
|
||||
|
||||
private class PublishedResponseListener implements PersistentTaskOperationListener {
|
||||
private class PublishedResponseListener implements ActionListener<PersistentTask<?>> {
|
||||
private final AllocatedPersistentTask task;
|
||||
|
||||
PublishedResponseListener(final AllocatedPersistentTask task) {
|
||||
|
@ -249,8 +249,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
|
||||
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
logger.trace("notification for task {} was successful", task.getPersistentTaskId());
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
logger.trace("notification for task {} was successful", task.getId());
|
||||
if (task.markAsNotified() == false) {
|
||||
logger.warn("attempt to mark task {} in the {} state as NOTIFIED", task.getPersistentTaskId(), task.getState());
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.persistent;
|
|||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -41,16 +42,15 @@ public class PersistentTasksService extends AbstractComponent {
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates the specified persistent action. The action is started unless the stopped parameter is equal to true.
|
||||
* If removeOnCompletion parameter is equal to true, the task is removed from the cluster state upon completion.
|
||||
* Otherwise it will remain there in the stopped state.
|
||||
* Creates the specified persistent task and attempts to assign it to a node.
|
||||
*/
|
||||
public <Request extends PersistentTaskRequest> void createPersistentActionTask(String action, Request request,
|
||||
PersistentTaskOperationListener listener) {
|
||||
CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(action, request);
|
||||
@SuppressWarnings("unchecked")
|
||||
public <Request extends PersistentTaskRequest> void startPersistentTask(String taskName, Request request,
|
||||
ActionListener<PersistentTask<Request>> listener) {
|
||||
CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(taskName, request);
|
||||
try {
|
||||
client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap(
|
||||
o -> listener.onResponse(o.getTaskId()), listener::onFailure));
|
||||
o -> listener.onResponse((PersistentTask<Request>) o.getTask()), listener::onFailure));
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
@ -58,13 +58,12 @@ public class PersistentTasksService extends AbstractComponent {
|
|||
|
||||
/**
|
||||
* Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure
|
||||
*
|
||||
*/
|
||||
public void sendCompletionNotification(long taskId, Exception failure, PersistentTaskOperationListener listener) {
|
||||
public void sendCompletionNotification(long taskId, Exception failure, ActionListener<PersistentTask<?>> listener) {
|
||||
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, failure);
|
||||
try {
|
||||
client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest, ActionListener.wrap(o -> listener.onResponse(taskId),
|
||||
listener::onFailure));
|
||||
client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest,
|
||||
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
@ -73,14 +72,13 @@ public class PersistentTasksService extends AbstractComponent {
|
|||
/**
|
||||
* Cancels the persistent task.
|
||||
*/
|
||||
public void sendCancellation(long taskId, PersistentTaskOperationListener listener) {
|
||||
void sendCancellation(long taskId, ActionListener<CancelTasksResponse> listener) {
|
||||
DiscoveryNode localNode = clusterService.localNode();
|
||||
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
|
||||
cancelTasksRequest.setTaskId(new TaskId(localNode.getId(), taskId));
|
||||
cancelTasksRequest.setReason("persistent action was removed");
|
||||
try {
|
||||
client.admin().cluster().cancelTasks(cancelTasksRequest, ActionListener.wrap(o -> listener.onResponse(taskId),
|
||||
listener::onFailure));
|
||||
client.admin().cluster().cancelTasks(cancelTasksRequest, listener);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
@ -88,28 +86,28 @@ public class PersistentTasksService extends AbstractComponent {
|
|||
|
||||
/**
|
||||
* Updates status of the persistent task.
|
||||
*
|
||||
* <p>
|
||||
* Persistent task implementers shouldn't call this method directly and use
|
||||
* {@link AllocatedPersistentTask#updatePersistentStatus} instead
|
||||
*/
|
||||
void updateStatus(long taskId, long allocationId, Task.Status status, PersistentTaskOperationListener listener) {
|
||||
void updateStatus(long taskId, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
|
||||
UpdatePersistentTaskStatusAction.Request updateStatusRequest =
|
||||
new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status);
|
||||
try {
|
||||
client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, ActionListener.wrap(
|
||||
o -> listener.onResponse(taskId), listener::onFailure));
|
||||
o -> listener.onResponse(o.getTask()), listener::onFailure));
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a persistent task
|
||||
* Cancels if needed and removes a persistent task
|
||||
*/
|
||||
public void removeTask(long taskId, PersistentTaskOperationListener listener) {
|
||||
public void cancelPersistentTask(long taskId, ActionListener<PersistentTask<?>> listener) {
|
||||
RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId);
|
||||
try {
|
||||
client.execute(RemovePersistentTaskAction.INSTANCE, removeRequest, ActionListener.wrap(o -> listener.onResponse(taskId),
|
||||
client.execute(RemovePersistentTaskAction.INSTANCE, removeRequest, ActionListener.wrap(o -> listener.onResponse(o.getTask()),
|
||||
listener::onFailure));
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
|
@ -121,15 +119,15 @@ public class PersistentTasksService extends AbstractComponent {
|
|||
* waits of it.
|
||||
*/
|
||||
public void waitForPersistentTaskStatus(long taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout,
|
||||
WaitForPersistentTaskStatusListener listener) {
|
||||
WaitForPersistentTaskStatusListener<?> listener) {
|
||||
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
|
||||
if (predicate.test(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId))) {
|
||||
listener.onResponse(taskId);
|
||||
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId));
|
||||
} else {
|
||||
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
listener.onResponse(taskId);
|
||||
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(state, taskId));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -145,15 +143,10 @@ public class PersistentTasksService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
public interface WaitForPersistentTaskStatusListener extends PersistentTaskOperationListener {
|
||||
public interface WaitForPersistentTaskStatusListener<Request extends PersistentTaskRequest>
|
||||
extends ActionListener<PersistentTask<Request>> {
|
||||
default void onTimeout(TimeValue timeout) {
|
||||
onFailure(new IllegalStateException("timed out after " + timeout));
|
||||
}
|
||||
}
|
||||
|
||||
public interface PersistentTaskOperationListener {
|
||||
void onResponse(long taskId);
|
||||
void onFailure(Exception e);
|
||||
}
|
||||
|
||||
}
|
|
@ -26,12 +26,13 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class RemovePersistentTaskAction extends Action<RemovePersistentTaskAction.Request,
|
||||
RemovePersistentTaskAction.Response,
|
||||
PersistentTaskResponse,
|
||||
RemovePersistentTaskAction.RequestBuilder> {
|
||||
|
||||
public static final RemovePersistentTaskAction INSTANCE = new RemovePersistentTaskAction();
|
||||
|
@ -47,8 +48,8 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
|
|||
}
|
||||
|
||||
@Override
|
||||
public Response newResponse() {
|
||||
return new Response();
|
||||
public PersistentTaskResponse newResponse() {
|
||||
return new PersistentTaskResponse();
|
||||
}
|
||||
|
||||
public static class Request extends MasterNodeRequest<Request> {
|
||||
|
@ -98,42 +99,8 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
|
|||
}
|
||||
}
|
||||
|
||||
public static class Response extends AcknowledgedResponse {
|
||||
public Response() {
|
||||
super();
|
||||
}
|
||||
|
||||
public Response(boolean acknowledged) {
|
||||
super(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
readAcknowledged(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
writeAcknowledged(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
AcknowledgedResponse that = (AcknowledgedResponse) o;
|
||||
return isAcknowledged() == that.isAcknowledged();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(isAcknowledged());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<RemovePersistentTaskAction.Request,
|
||||
RemovePersistentTaskAction.Response, RemovePersistentTaskAction.RequestBuilder> {
|
||||
PersistentTaskResponse, RemovePersistentTaskAction.RequestBuilder> {
|
||||
|
||||
protected RequestBuilder(ElasticsearchClient client, RemovePersistentTaskAction action) {
|
||||
super(client, action, new Request());
|
||||
|
@ -146,7 +113,7 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
|
|||
|
||||
}
|
||||
|
||||
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
|
||||
public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
|
||||
|
||||
private final PersistentTasksClusterService persistentTasksClusterService;
|
||||
|
||||
|
@ -166,8 +133,8 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse() {
|
||||
return new Response();
|
||||
protected PersistentTaskResponse newResponse() {
|
||||
return new PersistentTaskResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -177,11 +144,12 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
|
|||
}
|
||||
|
||||
@Override
|
||||
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) {
|
||||
persistentTasksClusterService.removePersistentTask(request.taskId, new ActionListener<Empty>() {
|
||||
protected final void masterOperation(final Request request, ClusterState state,
|
||||
final ActionListener<PersistentTaskResponse> listener) {
|
||||
persistentTasksClusterService.removePersistentTask(request.taskId, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(Empty empty) {
|
||||
listener.onResponse(new Response(true));
|
||||
public void onResponse(PersistentTask<?> task) {
|
||||
listener.onResponse(new PersistentTaskResponse(task));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,199 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.persistent;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
|
||||
import org.elasticsearch.action.support.master.MasterNodeRequest;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* This action can be used to start a persistent task previously created using {@link CreatePersistentTaskAction}
|
||||
*/
|
||||
public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.Request,
|
||||
StartPersistentTaskAction.Response,
|
||||
StartPersistentTaskAction.RequestBuilder> {
|
||||
|
||||
public static final StartPersistentTaskAction INSTANCE = new StartPersistentTaskAction();
|
||||
public static final String NAME = "cluster:admin/persistent/start";
|
||||
|
||||
private StartPersistentTaskAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
|
||||
return new RequestBuilder(client, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
public static class Request extends MasterNodeRequest<Request> {
|
||||
|
||||
private long taskId;
|
||||
|
||||
public Request() {
|
||||
|
||||
}
|
||||
|
||||
public Request(long taskId) {
|
||||
this.taskId = taskId;
|
||||
}
|
||||
|
||||
public void setTaskId(long taskId) {
|
||||
this.taskId = taskId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
taskId = in.readLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeLong(taskId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Request request = (Request) o;
|
||||
return taskId == request.taskId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(taskId);
|
||||
}
|
||||
}
|
||||
|
||||
public static class Response extends AcknowledgedResponse {
|
||||
public Response() {
|
||||
super();
|
||||
}
|
||||
|
||||
public Response(boolean acknowledged) {
|
||||
super(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
readAcknowledged(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
writeAcknowledged(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
AcknowledgedResponse that = (AcknowledgedResponse) o;
|
||||
return isAcknowledged() == that.isAcknowledged();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(isAcknowledged());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<StartPersistentTaskAction.Request,
|
||||
StartPersistentTaskAction.Response, StartPersistentTaskAction.RequestBuilder> {
|
||||
|
||||
protected RequestBuilder(ElasticsearchClient client, StartPersistentTaskAction action) {
|
||||
super(client, action, new Request());
|
||||
}
|
||||
|
||||
public final RequestBuilder setTaskId(long taskId) {
|
||||
request.setTaskId(taskId);
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
|
||||
|
||||
private final PersistentTasksClusterService persistentTasksClusterService;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters,
|
||||
PersistentTasksClusterService persistentTasksClusterService,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, StartPersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
indexNameExpressionResolver, Request::new);
|
||||
this.persistentTasksClusterService = persistentTasksClusterService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.MANAGEMENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
|
||||
// Cluster is not affected but we look up repositories in metadata
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) {
|
||||
persistentTasksClusterService.startPersistentTask(request.taskId, new ActionListener<Empty>() {
|
||||
@Override
|
||||
public void onResponse(Empty empty) {
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -27,12 +27,13 @@ import org.elasticsearch.tasks.Task;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTaskStatusAction.Request,
|
||||
UpdatePersistentTaskStatusAction.Response,
|
||||
PersistentTaskResponse,
|
||||
UpdatePersistentTaskStatusAction.RequestBuilder> {
|
||||
|
||||
public static final UpdatePersistentTaskStatusAction INSTANCE = new UpdatePersistentTaskStatusAction();
|
||||
|
@ -48,8 +49,8 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
|
|||
}
|
||||
|
||||
@Override
|
||||
public Response newResponse() {
|
||||
return new Response();
|
||||
public PersistentTaskResponse newResponse() {
|
||||
return new PersistentTaskResponse();
|
||||
}
|
||||
|
||||
public static class Request extends MasterNodeRequest<Request> {
|
||||
|
@ -118,42 +119,8 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
|
|||
}
|
||||
}
|
||||
|
||||
public static class Response extends AcknowledgedResponse {
|
||||
public Response() {
|
||||
super();
|
||||
}
|
||||
|
||||
public Response(boolean acknowledged) {
|
||||
super(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
readAcknowledged(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
writeAcknowledged(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
AcknowledgedResponse that = (AcknowledgedResponse) o;
|
||||
return isAcknowledged() == that.isAcknowledged();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(isAcknowledged());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<UpdatePersistentTaskStatusAction.Request,
|
||||
UpdatePersistentTaskStatusAction.Response, UpdatePersistentTaskStatusAction.RequestBuilder> {
|
||||
PersistentTaskResponse, UpdatePersistentTaskStatusAction.RequestBuilder> {
|
||||
|
||||
protected RequestBuilder(ElasticsearchClient client, UpdatePersistentTaskStatusAction action) {
|
||||
super(client, action, new Request());
|
||||
|
@ -171,7 +138,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
|
|||
|
||||
}
|
||||
|
||||
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
|
||||
public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
|
||||
|
||||
private final PersistentTasksClusterService persistentTasksClusterService;
|
||||
|
||||
|
@ -191,8 +158,8 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse() {
|
||||
return new Response();
|
||||
protected PersistentTaskResponse newResponse() {
|
||||
return new PersistentTaskResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -202,12 +169,13 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
|
|||
}
|
||||
|
||||
@Override
|
||||
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) {
|
||||
protected final void masterOperation(final Request request, ClusterState state,
|
||||
final ActionListener<PersistentTaskResponse> listener) {
|
||||
persistentTasksClusterService.updatePersistentTaskStatus(request.taskId, request.allocationId, request.status,
|
||||
new ActionListener<Empty>() {
|
||||
new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(Empty empty) {
|
||||
listener.onResponse(new Response(true));
|
||||
public void onResponse(PersistentTask<?> task) {
|
||||
listener.onResponse(new PersistentTaskResponse(task));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.datafeed;
|
|||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -46,7 +47,6 @@ import org.elasticsearch.xpack.ml.notifications.Auditor;
|
|||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
|
@ -318,8 +318,8 @@ public class DatafeedManagerTests extends ESTestCase {
|
|||
when(task.getEndTime()).thenReturn(endTime);
|
||||
doAnswer(invocationOnMock -> {
|
||||
@SuppressWarnings("rawtypes")
|
||||
PersistentTaskOperationListener listener = (PersistentTaskOperationListener) invocationOnMock.getArguments()[1];
|
||||
listener.onResponse(0L);
|
||||
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1];
|
||||
listener.onResponse(mock(PersistentTask.class));
|
||||
return null;
|
||||
}).when(task).updatePersistentStatus(any(), any());
|
||||
return task;
|
||||
|
@ -334,8 +334,8 @@ public class DatafeedManagerTests extends ESTestCase {
|
|||
task = spy(task);
|
||||
doAnswer(invocationOnMock -> {
|
||||
@SuppressWarnings("rawtypes")
|
||||
PersistentTaskOperationListener listener = (PersistentTaskOperationListener) invocationOnMock.getArguments()[1];
|
||||
listener.onResponse(0L);
|
||||
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1];
|
||||
listener.onResponse(mock(PersistentTask.class));
|
||||
return null;
|
||||
}).when(task).updatePersistentStatus(any(), any());
|
||||
return task;
|
||||
|
|
|
@ -1,22 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.persistent;
|
||||
|
||||
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction.Response;
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
|
||||
public class CancelPersistentTaskResponseTests extends AbstractStreamableTestCase<Response> {
|
||||
|
||||
@Override
|
||||
protected Response createTestInstance() {
|
||||
return new Response(randomBoolean());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response createBlankInstance() {
|
||||
return new Response();
|
||||
}
|
||||
}
|
|
@ -5,10 +5,10 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.persistent;
|
||||
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksExecutorIT.PersistentTaskOperationFuture;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest;
|
||||
|
@ -45,16 +45,16 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase {
|
|||
PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class);
|
||||
int numberOfTasks = randomIntBetween(1, 10);
|
||||
long[] taskIds = new long[numberOfTasks];
|
||||
List<PersistentTaskOperationFuture> futures = new ArrayList<>(numberOfTasks);
|
||||
List<PlainActionFuture<PersistentTask<TestRequest>>> futures = new ArrayList<>(numberOfTasks);
|
||||
|
||||
for (int i = 0; i < numberOfTasks; i++) {
|
||||
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();
|
||||
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
|
||||
futures.add(future);
|
||||
service.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
|
||||
service.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
|
||||
}
|
||||
|
||||
for (int i = 0; i < numberOfTasks; i++) {
|
||||
taskIds[i] = futures.get(i).get();
|
||||
taskIds[i] = futures.get(i).get().getId();
|
||||
}
|
||||
|
||||
PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData()
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.plugins.Plugin;
|
|||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
|
||||
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.Status;
|
||||
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||
|
@ -52,22 +53,20 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
assertNoRunningTasks();
|
||||
}
|
||||
|
||||
public static class PersistentTaskOperationFuture extends PlainActionFuture<Long> implements WaitForPersistentTaskStatusListener {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
set(taskId);
|
||||
}
|
||||
public static class WaitForPersistentTaskStatusFuture<Request extends PersistentTaskRequest>
|
||||
extends PlainActionFuture<PersistentTask<Request>>
|
||||
implements WaitForPersistentTaskStatusListener<Request> {
|
||||
}
|
||||
|
||||
public void testPersistentActionFailure() throws Exception {
|
||||
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
|
||||
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();
|
||||
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
|
||||
long taskId = future.get();
|
||||
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
|
||||
persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
|
||||
long taskId = future.get().getId();
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to start
|
||||
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
|
||||
.getTasks().size(), equalTo(1));
|
||||
.getTasks().size(), equalTo(1));
|
||||
});
|
||||
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.get().getTasks().get(0);
|
||||
|
@ -91,13 +90,13 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
|
||||
public void testPersistentActionCompletion() throws Exception {
|
||||
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
|
||||
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();
|
||||
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
|
||||
long taskId = future.get();
|
||||
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
|
||||
persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
|
||||
long taskId = future.get().getId();
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to start
|
||||
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
|
||||
.getTasks().size(), equalTo(1));
|
||||
.getTasks().size(), equalTo(1));
|
||||
});
|
||||
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.get().getTasks().get(0);
|
||||
|
@ -110,11 +109,11 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
|
||||
public void testPersistentActionWithNoAvailableNode() throws Exception {
|
||||
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
|
||||
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();
|
||||
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
|
||||
TestRequest testRequest = new TestRequest("Blah");
|
||||
testRequest.setExecutorNodeAttr("test");
|
||||
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, testRequest, future);
|
||||
long taskId = future.get();
|
||||
persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, testRequest, future);
|
||||
long taskId = future.get().getId();
|
||||
|
||||
Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
|
||||
String newNode = internalCluster().startNode(nodeSettings);
|
||||
|
@ -122,7 +121,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
assertBusy(() -> {
|
||||
// Wait for the task to start
|
||||
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
|
||||
.size(), equalTo(1));
|
||||
.size(), equalTo(1));
|
||||
});
|
||||
TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.get().getTasks().get(0);
|
||||
|
@ -139,21 +138,21 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
});
|
||||
|
||||
// Remove the persistent task
|
||||
PersistentTaskOperationFuture removeFuture = new PersistentTaskOperationFuture();
|
||||
persistentTasksService.removeTask(taskId, removeFuture);
|
||||
assertEquals(removeFuture.get(), (Long) taskId);
|
||||
PlainActionFuture<PersistentTask<?>> removeFuture = new PlainActionFuture<>();
|
||||
persistentTasksService.cancelPersistentTask(taskId, removeFuture);
|
||||
assertEquals(removeFuture.get().getId(), taskId);
|
||||
}
|
||||
|
||||
public void testPersistentActionStatusUpdate() throws Exception {
|
||||
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
|
||||
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();
|
||||
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
|
||||
long taskId = future.get();
|
||||
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
|
||||
persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
|
||||
long taskId = future.get().getId();
|
||||
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to start
|
||||
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
|
||||
.size(), equalTo(1));
|
||||
.size(), equalTo(1));
|
||||
});
|
||||
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.get().getTasks().get(0);
|
||||
|
@ -171,27 +170,27 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
.get().getTasks().size(), equalTo(1));
|
||||
|
||||
int finalI = i;
|
||||
PersistentTaskOperationFuture future1 = new PersistentTaskOperationFuture();
|
||||
WaitForPersistentTaskStatusFuture<?> future1 = new WaitForPersistentTaskStatusFuture<>();
|
||||
persistentTasksService.waitForPersistentTaskStatus(taskId,
|
||||
task -> task != null && task.isCurrentStatus()&& task.getStatus().toString() != null &&
|
||||
task -> task != null && task.isCurrentStatus() && task.getStatus().toString() != null &&
|
||||
task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"),
|
||||
TimeValue.timeValueSeconds(10), future1);
|
||||
assertThat(future1.get(), equalTo(taskId));
|
||||
assertThat(future1.get().getId(), equalTo(taskId));
|
||||
}
|
||||
|
||||
PersistentTaskOperationFuture future1 = new PersistentTaskOperationFuture();
|
||||
WaitForPersistentTaskStatusFuture<?> future1 = new WaitForPersistentTaskStatusFuture<>();
|
||||
persistentTasksService.waitForPersistentTaskStatus(taskId,
|
||||
task -> false, TimeValue.timeValueMillis(10), future1);
|
||||
|
||||
assertThrows(future1, IllegalStateException.class, "timed out after 10ms");
|
||||
|
||||
PersistentTaskOperationFuture failedUpdateFuture = new PersistentTaskOperationFuture();
|
||||
PlainActionFuture<PersistentTask<?>> failedUpdateFuture = new PlainActionFuture<>();
|
||||
persistentTasksService.updateStatus(taskId, -1, new Status("should fail"), failedUpdateFuture);
|
||||
assertThrows(failedUpdateFuture, ResourceNotFoundException.class, "the task with id " + taskId +
|
||||
" and allocation id -1 doesn't exist");
|
||||
|
||||
// Wait for the task to disappear
|
||||
PersistentTaskOperationFuture future2 = new PersistentTaskOperationFuture();
|
||||
WaitForPersistentTaskStatusFuture<?> future2 = new WaitForPersistentTaskStatusFuture<>();
|
||||
persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2);
|
||||
|
||||
logger.info("Completing the running task");
|
||||
|
@ -199,7 +198,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId())
|
||||
.get().getTasks().size(), equalTo(1));
|
||||
|
||||
assertThat(future2.get(), equalTo(taskId));
|
||||
assertThat(future2.get(), nullValue());
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -5,17 +5,37 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.persistent;
|
||||
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength;
|
||||
|
||||
public class PersistentTasksExecutorResponseTests extends AbstractStreamableTestCase<PersistentTaskResponse> {
|
||||
|
||||
@Override
|
||||
protected PersistentTaskResponse createTestInstance() {
|
||||
return new PersistentTaskResponse(randomLong());
|
||||
if (randomBoolean()) {
|
||||
return new PersistentTaskResponse(
|
||||
new PersistentTask<PersistentTaskRequest>(randomLong(), randomAsciiOfLength(10),
|
||||
new TestPersistentTasksPlugin.TestRequest("test"),
|
||||
PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT));
|
||||
} else {
|
||||
return new PersistentTaskResponse(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PersistentTaskResponse createBlankInstance() {
|
||||
return new PersistentTaskResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NamedWriteableRegistry getNamedWriteableRegistry() {
|
||||
return new NamedWriteableRegistry(Collections.singletonList(
|
||||
new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, TestPersistentTasksPlugin.TestRequest::new)
|
||||
));
|
||||
}
|
||||
}
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.persistent;
|
|||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
|
@ -21,8 +22,8 @@ import org.elasticsearch.tasks.TaskManager;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -149,10 +150,10 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
public void testTaskCancellation() {
|
||||
ClusterService clusterService = createClusterService();
|
||||
AtomicLong capturedTaskId = new AtomicLong();
|
||||
AtomicReference<PersistentTaskOperationListener> capturedListener = new AtomicReference<>();
|
||||
AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>();
|
||||
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) {
|
||||
@Override
|
||||
public void sendCancellation(long taskId, PersistentTaskOperationListener listener) {
|
||||
public void sendCancellation(long taskId, ActionListener<CancelTasksResponse> listener) {
|
||||
capturedTaskId.set(taskId);
|
||||
capturedListener.set(listener);
|
||||
}
|
||||
|
@ -203,7 +204,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
// That should trigger cancellation request
|
||||
assertThat(capturedTaskId.get(), equalTo(localId));
|
||||
// Notify successful cancellation
|
||||
capturedListener.get().onResponse(localId);
|
||||
capturedListener.get().onResponse(new CancelTasksResponse());
|
||||
|
||||
// finish or fail task
|
||||
if (randomBoolean()) {
|
||||
|
@ -226,11 +227,12 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
ClusterService clusterService = createClusterService();
|
||||
AtomicLong capturedTaskId = new AtomicLong(-1L);
|
||||
AtomicReference<Exception> capturedException = new AtomicReference<>();
|
||||
AtomicReference<PersistentTaskOperationListener> capturedListener = new AtomicReference<>();
|
||||
AtomicReference<ActionListener<PersistentTask<?>>> capturedListener = new AtomicReference<>();
|
||||
PersistentTasksService persistentTasksService =
|
||||
new PersistentTasksService(Settings.EMPTY, clusterService, null, null) {
|
||||
@Override
|
||||
public void sendCompletionNotification(long taskId, Exception failure, PersistentTaskOperationListener listener) {
|
||||
public void sendCompletionNotification(long taskId, Exception failure,
|
||||
ActionListener<PersistentTask<?>> listener) {
|
||||
capturedTaskId.set(taskId);
|
||||
capturedException.set(failure);
|
||||
capturedListener.set(listener);
|
||||
|
@ -283,7 +285,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
long id = taskManager.getTasks().values().iterator().next().getParentTaskId().getId();
|
||||
|
||||
// This time acknowledge notification
|
||||
capturedListener.get().onResponse(id);
|
||||
capturedListener.get().onResponse(
|
||||
new PersistentTask<>(1, TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, new TestRequest(), null));
|
||||
|
||||
// Reallocate failed task to another node
|
||||
state = newClusterState;
|
||||
|
|
|
@ -49,7 +49,7 @@ import org.elasticsearch.transport.TransportResponse.Empty;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -80,7 +80,6 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
|||
return Arrays.asList(
|
||||
new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class),
|
||||
new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class),
|
||||
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
|
||||
new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class),
|
||||
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
|
||||
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class)
|
||||
|
@ -93,8 +92,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
|||
NamedXContentRegistry xContentRegistry) {
|
||||
InternalClient internalClient = new InternalClient(Settings.EMPTY, threadPool, client);
|
||||
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, internalClient);
|
||||
TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, persistentTasksService,
|
||||
clusterService);
|
||||
TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, clusterService);
|
||||
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY,
|
||||
Collections.singletonList(testPersistentAction));
|
||||
return Arrays.asList(
|
||||
|
@ -306,9 +304,8 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
|||
public static final String NAME = "cluster:admin/persistent/test";
|
||||
private final ClusterService clusterService;
|
||||
|
||||
public TestPersistentTasksExecutor(Settings settings, PersistentTasksService persistentTasksService,
|
||||
ClusterService clusterService) {
|
||||
super(settings, NAME, persistentTasksService, ThreadPool.Names.GENERIC);
|
||||
public TestPersistentTasksExecutor(Settings settings, ClusterService clusterService) {
|
||||
super(settings, NAME, ThreadPool.Names.GENERIC);
|
||||
this.clusterService = clusterService;
|
||||
}
|
||||
|
||||
|
@ -354,9 +351,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
|||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Status status = new Status("phase " + phase.incrementAndGet());
|
||||
logger.info("updating the task status to {}", status);
|
||||
task.updatePersistentStatus(status, new PersistentTaskOperationListener() {
|
||||
task.updatePersistentStatus(status, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
logger.info("updating was successful");
|
||||
latch.countDown();
|
||||
}
|
||||
|
|
|
@ -139,7 +139,6 @@ indices:internal/data/write/mldeletebyquery
|
|||
cluster:internal/xpack/ml/job/update/process
|
||||
cluster:admin/xpack/ml/delete_expired_data
|
||||
cluster:admin/persistent/create
|
||||
cluster:admin/persistent/start
|
||||
cluster:admin/persistent/completion
|
||||
cluster:admin/persistent/update_status
|
||||
cluster:admin/persistent/remove
|
||||
|
|
Loading…
Reference in New Issue