Simplify names of PersistentTasks-related classes

PersistentTask -> NodePersistentTask
PersistentTasksInProgress -> PersistentTasks
PersistentTaskInProgress -> PersistentTask

Original commit: elastic/x-pack-elasticsearch@0947dbca56
This commit is contained in:
Igor Motov 2017-02-27 11:37:42 -07:00
parent ec4de10ee2
commit 377c1ec2b4
39 changed files with 436 additions and 441 deletions

View File

@ -127,7 +127,7 @@ import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
import org.elasticsearch.xpack.persistent.PersistentActionService;
import org.elasticsearch.xpack.persistent.PersistentTaskClusterService;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.elasticsearch.xpack.persistent.StartPersistentTaskAction;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
@ -212,8 +212,8 @@ public class MachineLearning extends Plugin implements ActionPlugin {
// Custom metadata
new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, "ml", MlMetadata.MlMetadataDiff::new),
new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom),
new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasks.TYPE, PersistentTasks::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasks.TYPE, PersistentTasks::readDiffFrom),
// Persistent action requests
new NamedWriteableRegistry.Entry(PersistentActionRequest.class, StartDatafeedAction.NAME, StartDatafeedAction.Request::new),
@ -233,8 +233,8 @@ public class MachineLearning extends Plugin implements ActionPlugin {
// Custom metadata
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("ml"),
parser -> MlMetadata.ML_METADATA_PARSER.parse(parser, null).build()),
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasksInProgress.TYPE),
PersistentTasksInProgress::fromXContent),
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasks.TYPE),
PersistentTasks::fromXContent),
// Persistent action requests
new NamedXContentRegistry.Entry(PersistentActionRequest.class, new ParseField(StartDatafeedAction.NAME),

View File

@ -32,8 +32,8 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.io.IOException;
import java.util.Collection;
@ -236,7 +236,7 @@ public class MlMetadata implements MetaData.Custom {
return this;
}
public Builder deleteJob(String jobId, PersistentTasksInProgress tasks) {
public Builder deleteJob(String jobId, PersistentTasks tasks) {
Optional<DatafeedConfig> datafeed = getDatafeedByJobId(jobId);
if (datafeed.isPresent()) {
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] while datafeed ["
@ -281,14 +281,14 @@ public class MlMetadata implements MetaData.Custom {
}
}
public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksInProgress persistentTasksInProgress) {
public Builder updateDatafeed(DatafeedUpdate update, PersistentTasks persistentTasks) {
String datafeedId = update.getId();
DatafeedConfig oldDatafeedConfig = datafeeds.get(datafeedId);
if (oldDatafeedConfig == null) {
throw ExceptionsHelper.missingDatafeedException(datafeedId);
}
checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE, datafeedId,
DatafeedState.STARTED), datafeedId, persistentTasksInProgress);
DatafeedState.STARTED), datafeedId, persistentTasks);
DatafeedConfig newDatafeedConfig = update.apply(oldDatafeedConfig);
if (newDatafeedConfig.getJobId().equals(oldDatafeedConfig.getJobId()) == false) {
checkJobIsAvailableForDatafeed(newDatafeedConfig.getJobId());
@ -299,13 +299,13 @@ public class MlMetadata implements MetaData.Custom {
return this;
}
public Builder removeDatafeed(String datafeedId, PersistentTasksInProgress persistentTasksInProgress) {
public Builder removeDatafeed(String datafeedId, PersistentTasks persistentTasks) {
DatafeedConfig datafeed = datafeeds.get(datafeedId);
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(datafeedId);
}
checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, datafeedId,
DatafeedState.STARTED), datafeedId, persistentTasksInProgress);
DatafeedState.STARTED), datafeedId, persistentTasks);
datafeeds.remove(datafeedId);
return this;
}
@ -314,13 +314,13 @@ public class MlMetadata implements MetaData.Custom {
return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst();
}
private void checkDatafeedIsStopped(Supplier<String> msg, String datafeedId, PersistentTasksInProgress persistentTasksInProgress) {
if (persistentTasksInProgress != null) {
Predicate<PersistentTaskInProgress<?>> predicate = t -> {
private void checkDatafeedIsStopped(Supplier<String> msg, String datafeedId, PersistentTasks persistentTasks) {
if (persistentTasks != null) {
Predicate<PersistentTask<?>> predicate = t -> {
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest();
return storedRequest.getDatafeedId().equals(datafeedId);
};
if (persistentTasksInProgress.tasksExist(StartDatafeedAction.NAME, predicate)) {
if (persistentTasks.tasksExist(StartDatafeedAction.NAME, predicate)) {
throw ExceptionsHelper.conflictStatusException(msg.get());
}
}
@ -344,7 +344,7 @@ public class MlMetadata implements MetaData.Custom {
return new MlMetadata(jobs, datafeeds);
}
public void markJobAsDeleted(String jobId, PersistentTasksInProgress tasks) {
public void markJobAsDeleted(String jobId, PersistentTasks tasks) {
Job job = jobs.get(jobId);
if (job == null) {
throw ExceptionsHelper.missingJobException(jobId);
@ -371,14 +371,13 @@ public class MlMetadata implements MetaData.Custom {
}
@Nullable
public static PersistentTasksInProgress.PersistentTaskInProgress<?> getJobTask(String jobId,
@Nullable PersistentTasksInProgress tasks) {
public static PersistentTask<?> getJobTask(String jobId, @Nullable PersistentTasks tasks) {
if (tasks != null) {
Predicate<PersistentTasksInProgress.PersistentTaskInProgress<?>> p = t -> {
Predicate<PersistentTask<?>> p = t -> {
OpenJobAction.Request storedRequest = (OpenJobAction.Request) t.getRequest();
return storedRequest.getJobId().equals(jobId);
};
for (PersistentTasksInProgress.PersistentTaskInProgress<?> task : tasks.findTasks(OpenJobAction.NAME, p)) {
for (PersistentTask<?> task : tasks.findTasks(OpenJobAction.NAME, p)) {
return task;
}
}
@ -386,22 +385,21 @@ public class MlMetadata implements MetaData.Custom {
}
@Nullable
public static PersistentTasksInProgress.PersistentTaskInProgress<?> getDatafeedTask(String datafeedId,
@Nullable PersistentTasksInProgress tasks) {
public static PersistentTask<?> getDatafeedTask(String datafeedId, @Nullable PersistentTasks tasks) {
if (tasks != null) {
Predicate<PersistentTasksInProgress.PersistentTaskInProgress<?>> p = t -> {
Predicate<PersistentTask<?>> p = t -> {
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest();
return storedRequest.getDatafeedId().equals(datafeedId);
};
for (PersistentTasksInProgress.PersistentTaskInProgress<?> task : tasks.findTasks(StartDatafeedAction.NAME, p)) {
for (PersistentTask<?> task : tasks.findTasks(StartDatafeedAction.NAME, p)) {
return task;
}
}
return null;
}
public static JobState getJobState(String jobId, @Nullable PersistentTasksInProgress tasks) {
PersistentTasksInProgress.PersistentTaskInProgress<?> task = getJobTask(jobId, tasks);
public static JobState getJobState(String jobId, @Nullable PersistentTasks tasks) {
PersistentTask<?> task = getJobTask(jobId, tasks);
if (task != null && task.getStatus() != null) {
JobState jobTaskState = (JobState) task.getStatus();
if (jobTaskState != null) {
@ -412,8 +410,8 @@ public class MlMetadata implements MetaData.Custom {
return JobState.CLOSED;
}
public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksInProgress tasks) {
PersistentTasksInProgress.PersistentTaskInProgress<?> task = getDatafeedTask(datafeedId, tasks);
public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasks tasks) {
PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
if (task != null && task.getStatus() != null) {
return (DatafeedState) task.getStatus();
} else {

View File

@ -41,8 +41,8 @@ import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.io.IOException;
import java.util.Date;
@ -278,19 +278,19 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
}
}
static PersistentTaskInProgress<?> validateAndFindTask(String jobId, ClusterState state) {
static PersistentTask<?> validateAndFindTask(String jobId, ClusterState state) {
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
if (mlMetadata.getJobs().containsKey(jobId) == false) {
throw ExceptionsHelper.missingJobException(jobId);
}
PersistentTasksInProgress tasks = state.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = state.getMetaData().custom(PersistentTasks.TYPE);
if (tasks != null) {
Predicate<PersistentTaskInProgress<?>> p = t -> {
Predicate<PersistentTask<?>> p = t -> {
OpenJobAction.Request storedRequest = (OpenJobAction.Request) t.getRequest();
return storedRequest.getJobId().equals(jobId);
};
for (PersistentTaskInProgress<?> task : tasks.findTasks(OpenJobAction.NAME, p)) {
for (PersistentTask<?> task : tasks.findTasks(OpenJobAction.NAME, p)) {
OpenJobAction.Request storedRequest = (OpenJobAction.Request) task.getRequest();
if (storedRequest.getJobId().equals(jobId)) {
JobState jobState = (JobState) task.getStatus();
@ -307,13 +307,13 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
}
static ClusterState moveJobToClosingState(String jobId, ClusterState currentState) {
PersistentTaskInProgress<?> task = validateAndFindTask(jobId, currentState);
PersistentTasksInProgress currentTasks = currentState.getMetaData().custom(PersistentTasksInProgress.TYPE);
Map<Long, PersistentTaskInProgress<?>> updatedTasks = new HashMap<>(currentTasks.taskMap());
PersistentTaskInProgress<?> taskToUpdate = currentTasks.getTask(task.getId());
taskToUpdate = new PersistentTaskInProgress<>(taskToUpdate, JobState.CLOSING);
PersistentTask<?> task = validateAndFindTask(jobId, currentState);
PersistentTasks currentTasks = currentState.getMetaData().custom(PersistentTasks.TYPE);
Map<Long, PersistentTask<?>> updatedTasks = new HashMap<>(currentTasks.taskMap());
PersistentTask<?> taskToUpdate = currentTasks.getTask(task.getId());
taskToUpdate = new PersistentTask<>(taskToUpdate, JobState.CLOSING);
updatedTasks.put(taskToUpdate.getId(), taskToUpdate);
PersistentTasksInProgress newTasks = new PersistentTasksInProgress(currentTasks.getCurrentId(), updatedTasks);
PersistentTasks newTasks = new PersistentTasks(currentTasks.getCurrentId(), updatedTasks);
MlMetadata mlMetadata = currentState.metaData().custom(MlMetadata.TYPE);
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
@ -325,7 +325,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
return builder
.metaData(new MetaData.Builder(currentState.metaData())
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())
.putCustom(PersistentTasksInProgress.TYPE, newTasks))
.putCustom(PersistentTasks.TYPE, newTasks))
.build();
}
}

View File

@ -32,7 +32,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import java.io.IOException;
import java.util.Objects;
@ -170,10 +170,10 @@ public class DeleteDatafeedAction extends Action<DeleteDatafeedAction.Request, D
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata currentMetadata = state.getMetaData().custom(MlMetadata.TYPE);
PersistentTasksInProgress persistentTasksInProgress =
state.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks persistentTasks =
state.getMetaData().custom(PersistentTasks.TYPE);
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
.removeDatafeed(request.getDatafeedId(), persistentTasksInProgress).build();
.removeDatafeed(request.getDatafeedId(), persistentTasks).build();
return ClusterState.builder(state).metaData(
MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build())
.build();

View File

@ -38,8 +38,8 @@ import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.io.IOException;
import java.util.ArrayList;
@ -304,14 +304,14 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
Map<String, DatafeedStats> results = new HashMap<>();
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
PersistentTasksInProgress tasksInProgress = state.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasksInProgress = state.getMetaData().custom(PersistentTasks.TYPE);
if (request.getDatafeedId().equals(ALL) == false && mlMetadata.getDatafeed(request.getDatafeedId()) == null) {
throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId());
}
for (DatafeedConfig datafeedConfig : mlMetadata.getDatafeeds().values()) {
if (request.getDatafeedId().equals(ALL) || datafeedConfig.getId().equals(request.getDatafeedId())) {
PersistentTaskInProgress<?> task = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasksInProgress);
PersistentTask<?> task = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasksInProgress);
DatafeedState datafeedState = MlMetadata.getDatafeedState(request.getDatafeedId(), tasksInProgress);
DiscoveryNode node = null;
String explanation = null;

View File

@ -44,7 +44,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManage
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import java.io.IOException;
import java.util.ArrayList;
@ -394,10 +394,10 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
String jobId = task.getJobId();
logger.debug("Get stats for job [{}]", jobId);
ClusterState state = clusterService.state();
PersistentTasksInProgress tasks = state.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = state.getMetaData().custom(PersistentTasks.TYPE);
Optional<Tuple<DataCounts, ModelSizeStats>> stats = processManager.getStatistics(jobId);
if (stats.isPresent()) {
PersistentTasksInProgress.PersistentTaskInProgress<?> pTask = MlMetadata.getJobTask(jobId, tasks);
PersistentTasks.PersistentTask<?> pTask = MlMetadata.getJobTask(jobId, tasks);
DiscoveryNode node = state.nodes().get(pTask.getExecutorNode());
JobState jobState = MlMetadata.getJobState(jobId, tasks);
String assignmentExplanation = pTask.getAssignment().getExplanation();
@ -420,13 +420,13 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
AtomicInteger counter = new AtomicInteger(jobIds.size());
AtomicArray<Response.JobStats> jobStats = new AtomicArray<>(jobIds.size());
PersistentTasksInProgress tasks = clusterService.state().getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = clusterService.state().getMetaData().custom(PersistentTasks.TYPE);
for (int i = 0; i < jobIds.size(); i++) {
int slot = i;
String jobId = jobIds.get(i);
gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> {
JobState jobState = MlMetadata.getJobState(request.jobId, tasks);
PersistentTasksInProgress.PersistentTaskInProgress<?> pTask = MlMetadata.getJobTask(jobId, tasks);
PersistentTasks.PersistentTask<?> pTask = MlMetadata.getJobTask(jobId, tasks);
String assignmentExplanation = null;
if (pTask != null) {
assignmentExplanation = pTask.getAssignment().getExplanation();

View File

@ -54,10 +54,10 @@ import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.PersistentActionService;
import org.elasticsearch.xpack.persistent.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.NodePersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import org.elasticsearch.xpack.persistent.TransportPersistentAction;
import java.io.IOException;
@ -218,7 +218,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
}
}
public static class JobTask extends PersistentTask {
public static class JobTask extends NodePersistentTask {
private final String jobId;
private volatile Consumer<String> cancelHandler;
@ -320,12 +320,12 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
@Override
public void validate(Request request, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE);
OpenJobAction.validate(request.getJobId(), mlMetadata, tasks, clusterState.nodes());
}
@Override
protected void nodeOperation(PersistentTask task, Request request, ActionListener<TransportResponse.Empty> listener) {
protected void nodeOperation(NodePersistentTask task, Request request, ActionListener<TransportResponse.Empty> listener) {
autodetectProcessManager.setJobState(task.getPersistentTaskId(), JobState.OPENING, e1 -> {
if (e1 != null) {
listener.onFailure(e1);
@ -376,7 +376,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
* Fail fast before trying to update the job state on master node if the job doesn't exist or its state
* is not what it should be.
*/
static void validate(String jobId, MlMetadata mlMetadata, @Nullable PersistentTasksInProgress tasks, DiscoveryNodes nodes) {
static void validate(String jobId, MlMetadata mlMetadata, @Nullable PersistentTasks tasks, DiscoveryNodes nodes) {
Job job = mlMetadata.getJobs().get(jobId);
if (job == null) {
throw ExceptionsHelper.missingJobException(jobId);
@ -385,7 +385,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
throw new ElasticsearchStatusException("Cannot open job [" + jobId + "] because it has been marked as deleted",
RestStatus.CONFLICT);
}
PersistentTaskInProgress<?> task = MlMetadata.getJobTask(jobId, tasks);
PersistentTask<?> task = MlMetadata.getJobTask(jobId, tasks);
JobState jobState = MlMetadata.getJobState(jobId, tasks);
if (task != null && jobState == JobState.OPENED) {
if (task.isAssigned() == false) {
@ -418,7 +418,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
long maxAvailable = Long.MIN_VALUE;
List<String> reasons = new LinkedList<>();
DiscoveryNode minLoadedNode = null;
PersistentTasksInProgress persistentTasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks persistentTasks = clusterState.getMetaData().custom(PersistentTasks.TYPE);
for (DiscoveryNode node : clusterState.getNodes()) {
Map<String, String> nodeAttributes = node.getAttributes();
String maxNumberOfOpenJobsStr = nodeAttributes.get(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey());
@ -431,9 +431,9 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
long numberOfAssignedJobs;
int numberOfAllocatingJobs;
if (persistentTasksInProgress != null) {
numberOfAssignedJobs = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), OpenJobAction.NAME);
numberOfAllocatingJobs = persistentTasksInProgress.findTasks(OpenJobAction.NAME, task -> {
if (persistentTasks != null) {
numberOfAssignedJobs = persistentTasks.getNumberOfTasksOnNode(node.getId(), OpenJobAction.NAME);
numberOfAllocatingJobs = persistentTasks.findTasks(OpenJobAction.NAME, task -> {
if (node.getId().equals(task.getExecutorNode()) == false) {
return false;
}

View File

@ -53,10 +53,10 @@ import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.PersistentActionService;
import org.elasticsearch.xpack.persistent.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.NodePersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import org.elasticsearch.xpack.persistent.TransportPersistentAction;
import java.io.IOException;
@ -225,7 +225,7 @@ public class StartDatafeedAction
}
}
public static class DatafeedTask extends PersistentTask {
public static class DatafeedTask extends NodePersistentTask {
private final String datafeedId;
private final long startTime;
@ -327,14 +327,15 @@ public class StartDatafeedAction
@Override
public void validate(Request request, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE);
DiscoveryNodes nodes = clusterState.getNodes();
StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks, nodes);
}
@Override
protected void nodeOperation(PersistentTask persistentTask, Request request, ActionListener<TransportResponse.Empty> listener) {
DatafeedTask datafeedTask = (DatafeedTask) persistentTask;
protected void nodeOperation(NodePersistentTask nodePersistentTask, Request request,
ActionListener<TransportResponse.Empty> listener) {
DatafeedTask datafeedTask = (DatafeedTask) nodePersistentTask;
datafeedJobRunner.run(datafeedTask,
(error) -> {
if (error != null) {
@ -372,7 +373,7 @@ public class StartDatafeedAction
}
static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks, DiscoveryNodes nodes) {
static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasks tasks, DiscoveryNodes nodes) {
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(datafeedId);
@ -388,7 +389,7 @@ public class StartDatafeedAction
RestStatus.CONFLICT, JobState.OPENED, jobState);
}
PersistentTaskInProgress<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks);
if (datafeedTask != null && datafeedState == DatafeedState.STARTED) {
if (datafeedTask.isAssigned() == false) {
@ -410,11 +411,11 @@ public class StartDatafeedAction
public static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE);
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
DiscoveryNodes nodes = clusterState.getNodes();
PersistentTaskInProgress<?> jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks);
PersistentTask<?> jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks);
if (jobTask == null) {
String reason = "cannot start datafeed [" + datafeed.getId() + "], job task doesn't yet exist";
logger.debug(reason);

View File

@ -44,7 +44,8 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.io.IOException;
import java.util.List;
@ -215,7 +216,7 @@ public class StopDatafeedAction
ClusterState state = clusterService.state();
MetaData metaData = state.metaData();
MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE);
PersistentTasksInProgress tasks = metaData.custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = metaData.custom(PersistentTasks.TYPE);
String nodeId = validateAndReturnNodeId(request.getDatafeedId(), mlMetadata, tasks);
request.setNodes(nodeId);
ActionListener<Response> finalListener =
@ -260,12 +261,12 @@ public class StopDatafeedAction
}
}
static String validateAndReturnNodeId(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks) {
static String validateAndReturnNodeId(String datafeedId, MlMetadata mlMetadata, PersistentTasks tasks) {
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
if (datafeed == null) {
throw new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId));
}
PersistentTasksInProgress.PersistentTaskInProgress<?> task = MlMetadata.getDatafeedTask(datafeedId, tasks);
PersistentTask<?> task = MlMetadata.getDatafeedTask(datafeedId, tasks);
if (task == null || task.getStatus() != DatafeedState.STARTED) {
throw new ElasticsearchStatusException("datafeed already stopped, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedState.STARTED, DatafeedState.STOPPED);

View File

@ -30,7 +30,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import java.io.IOException;
import java.util.List;
@ -62,8 +62,8 @@ public abstract class TransportJobTaskAction<OperationTask extends Task, Request
// node running the job task.
ClusterState state = clusterService.state();
JobManager.getJobOrThrowIfUnknown(state, jobId);
PersistentTasksInProgress tasks = clusterService.state().getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress.PersistentTaskInProgress<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
PersistentTasks tasks = clusterService.state().getMetaData().custom(PersistentTasks.TYPE);
PersistentTasks.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask == null || jobTask.isAssigned() == false) {
listener.onFailure( new ElasticsearchStatusException("job [" + jobId + "] state is [" + JobState.CLOSED +
"], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT));
@ -76,7 +76,7 @@ public abstract class TransportJobTaskAction<OperationTask extends Task, Request
@Override
protected final void taskOperation(Request request, OperationTask task, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE);
JobState jobState = MlMetadata.getJobState(request.getJobId(), tasks);
if (jobState == JobState.OPENED) {
innerTaskOperation(request, task, listener);

View File

@ -32,7 +32,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import java.io.IOException;
import java.util.Objects;
@ -161,10 +161,10 @@ public class UpdateDatafeedAction extends Action<UpdateDatafeedAction.Request, P
public ClusterState execute(ClusterState currentState) throws Exception {
DatafeedUpdate update = request.getUpdate();
MlMetadata currentMetadata = state.getMetaData().custom(MlMetadata.TYPE);
PersistentTasksInProgress persistentTasksInProgress =
state.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks persistentTasks =
state.getMetaData().custom(PersistentTasks.TYPE);
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
.updateDatafeed(update, persistentTasksInProgress).build();
.updateDatafeed(update, persistentTasks).build();
updatedDatafeed = newMetadata.getDatafeed(update.getId());
return ClusterState.builder(currentState).metaData(
MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build()).build();

View File

@ -35,7 +35,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import java.io.IOException;
import java.util.Objects;
@ -168,7 +168,7 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
throw new IllegalArgumentException("Job Id " + Job.ALL + " cannot be for update");
}
PersistentTasksInProgress tasks = clusterService.state().getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = clusterService.state().getMetaData().custom(PersistentTasks.TYPE);
boolean jobIsOpen = MlMetadata.getJobState(request.getJobId(), tasks) == JobState.OPENED;
semaphoreByJob.computeIfAbsent(request.getJobId(), id -> new Semaphore(1)).acquire();

View File

@ -34,7 +34,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
import java.time.Duration;

View File

@ -34,7 +34,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import java.util.Collections;
import java.util.List;
@ -127,7 +127,7 @@ public class JobManager extends AbstractComponent {
}
public JobState getJobState(String jobId) {
PersistentTasksInProgress tasks = clusterService.state().getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = clusterService.state().getMetaData().custom(PersistentTasks.TYPE);
return MlMetadata.getJobState(jobId, tasks);
}
@ -281,7 +281,7 @@ public class JobManager extends AbstractComponent {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata.Builder builder = createMlMetadataBuilder(currentState);
builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksInProgress.TYPE));
builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasks.TYPE));
return buildNewClusterState(currentState, builder);
}
});
@ -307,7 +307,7 @@ public class JobManager extends AbstractComponent {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata currentMlMetadata = currentState.metaData().custom(MlMetadata.TYPE);
PersistentTasksInProgress tasks = currentState.metaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = currentState.metaData().custom(PersistentTasks.TYPE);
MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata);
builder.markJobAsDeleted(jobId, tasks);
return buildNewClusterState(currentState, builder);

View File

@ -14,7 +14,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import java.util.function.Consumer;
import java.util.function.Predicate;
@ -35,7 +35,7 @@ public class DatafeedStateObserver {
ClusterStateObserver observer =
new ClusterStateObserver(clusterService, LOGGER, threadPool.getThreadContext());
Predicate<ClusterState> predicate = (newState) -> {
PersistentTasksInProgress tasks = newState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = newState.getMetaData().custom(PersistentTasks.TYPE);
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks);
return datafeedState == expectedState;
};

View File

@ -16,7 +16,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import java.util.function.Consumer;
import java.util.function.Predicate;
@ -87,7 +87,7 @@ public class JobStateObserver {
@Override
public boolean test(ClusterState newState) {
PersistentTasksInProgress tasks = newState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = newState.getMetaData().custom(PersistentTasks.TYPE);
JobState jobState = MlMetadata.getJobState(jobId, tasks);
if (jobState == JobState.FAILED) {
failed = true;

View File

@ -10,14 +10,14 @@ import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
/**
* Task that returns additional state information
* Represents a executor node operation that corresponds to a persistent task
*/
public class PersistentTask extends CancellableTask {
public class NodePersistentTask extends CancellableTask {
private Provider<Status> statusProvider;
private long persistentTaskId;
public PersistentTask(long id, String type, String action, String description, TaskId parentTask) {
public NodePersistentTask(long id, String type, String action, String description, TaskId parentTask) {
super(id, type, action, description, parentTask);
}

View File

@ -24,7 +24,7 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.io.IOException;
import java.util.HashMap;
@ -62,15 +62,15 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
@Override
public void clusterChanged(ClusterChangedEvent event) {
PersistentTasksInProgress tasks = event.state().getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress previousTasks = event.previousState().getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = event.state().getMetaData().custom(PersistentTasks.TYPE);
PersistentTasks previousTasks = event.previousState().getMetaData().custom(PersistentTasks.TYPE);
if (Objects.equals(tasks, previousTasks) == false || event.nodesChanged()) {
// We have some changes let's check if they are related to our node
String localNodeId = event.state().getNodes().getLocalNodeId();
Set<PersistentTaskId> notVisitedTasks = new HashSet<>(runningTasks.keySet());
if (tasks != null) {
for (PersistentTaskInProgress<?> taskInProgress : tasks.tasks()) {
for (PersistentTask<?> taskInProgress : tasks.tasks()) {
if (localNodeId.equals(taskInProgress.getExecutorNode())) {
PersistentTaskId persistentTaskId = new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId());
RunningPersistentTask persistentTask = runningTasks.get(persistentTaskId);
@ -111,10 +111,10 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
}
private <Request extends PersistentActionRequest> void startTask(PersistentTaskInProgress<Request> taskInProgress) {
private <Request extends PersistentActionRequest> void startTask(PersistentTask<Request> taskInProgress) {
PersistentActionRegistry.PersistentActionHolder<Request> holder =
persistentActionRegistry.getPersistentActionHolderSafe(taskInProgress.getAction());
PersistentTask task = (PersistentTask) taskManager.register("persistent", taskInProgress.getAction() + "[c]",
NodePersistentTask task = (NodePersistentTask) taskManager.register("persistent", taskInProgress.getAction() + "[c]",
taskInProgress.getRequest());
boolean processed = false;
try {
@ -283,23 +283,23 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
}
private static class RunningPersistentTask implements Provider<Task.Status> {
private final PersistentTask task;
private final NodePersistentTask task;
private final long id;
private final AtomicReference<State> state;
@Nullable
private Exception failure;
RunningPersistentTask(PersistentTask task, long id) {
RunningPersistentTask(NodePersistentTask task, long id) {
this(task, id, State.STARTED);
}
RunningPersistentTask(PersistentTask task, long id, State state) {
RunningPersistentTask(NodePersistentTask task, long id, State state) {
this.task = task;
this.id = id;
this.state = new AtomicReference<>(state);
}
public PersistentTask getTask() {
public NodePersistentTask getTask() {
return task;
}

View File

@ -21,7 +21,7 @@ public class PersistentActionExecutor {
}
public <Request extends PersistentActionRequest> void executeAction(Request request,
PersistentTask task,
NodePersistentTask task,
PersistentActionRegistry.PersistentActionHolder<Request> holder,
ActionListener<Empty> listener) {
threadPool.executor(holder.getExecutor()).execute(new AbstractRunnable() {

View File

@ -17,6 +17,6 @@ import org.elasticsearch.tasks.TaskId;
public abstract class PersistentActionRequest extends ActionRequest implements NamedWriteable, ToXContent {
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new PersistentTask(id, type, action, getDescription(), parentTaskId);
return new NodePersistentTask(id, type, action, getDescription(), parentTaskId);
}
}

View File

@ -13,15 +13,14 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.util.Objects;
@ -56,7 +55,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
public ClusterState execute(ClusterState currentState) throws Exception {
final Assignment assignment;
if (stopped) {
assignment = PersistentTasksInProgress.FINISHED_TASK_ASSIGNMENT; // the task is stopped no need to assign it anywhere
assignment = PersistentTasks.FINISHED_TASK_ASSIGNMENT; // the task is stopped no need to assign it anywhere
} else {
assignment = getAssignement(action, currentState, request);
}
@ -71,7 +70,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(
((PersistentTasksInProgress) newState.getMetaData().custom(PersistentTasksInProgress.TYPE)).getCurrentId());
((PersistentTasks) newState.getMetaData().custom(PersistentTasks.TYPE)).getCurrentId());
}
});
}
@ -95,7 +94,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksInProgress.Builder tasksInProgress = builder(currentState);
PersistentTasks.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id)) {
if (failure != null) {
// If the task failed - we need to restart it on another node, otherwise we just remove it
@ -133,7 +132,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
clusterService.submitStateUpdateTask("start persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksInProgress.Builder tasksInProgress = builder(currentState);
PersistentTasks.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id)) {
return update(currentState, tasksInProgress
.assignTask(id, (action, request) -> getAssignement(action, currentState, request)));
@ -164,7 +163,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksInProgress.Builder tasksInProgress = builder(currentState);
PersistentTasks.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id)) {
return update(currentState, tasksInProgress.removeTask(id));
} else {
@ -195,7 +194,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksInProgress.Builder tasksInProgress = builder(currentState);
PersistentTasks.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id)) {
return update(currentState, tasksInProgress.updateTaskStatus(id, status));
} else {
@ -239,15 +238,15 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
}
static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) {
PersistentTasksInProgress tasks = event.state().getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress prevTasks = event.previousState().getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = event.state().getMetaData().custom(PersistentTasks.TYPE);
PersistentTasks prevTasks = event.previousState().getMetaData().custom(PersistentTasks.TYPE);
if (tasks != null && (Objects.equals(tasks, prevTasks) == false ||
event.nodesChanged() ||
event.routingTableChanged() ||
event.previousState().nodes().isLocalNodeElectedMaster() == false)) {
// We need to check if removed nodes were running any of the tasks and reassign them
boolean reassignmentRequired = false;
for (PersistentTaskInProgress<?> taskInProgress : tasks.tasks()) {
for (PersistentTask<?> taskInProgress : tasks.tasks()) {
if (taskInProgress.needsReassignment(event.state().nodes())) {
// there is an unassigned task or task with a disappeared node - we need to try assigning it
if (Objects.equals(taskInProgress.getAssignment(),
@ -287,13 +286,13 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
}
static ClusterState reassignTasks(ClusterState currentState, Logger logger, ExecutorNodeDecider decider) {
PersistentTasksInProgress tasks = currentState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = currentState.getMetaData().custom(PersistentTasks.TYPE);
ClusterState clusterState = currentState;
DiscoveryNodes nodes = currentState.nodes();
if (tasks != null) {
logger.trace("reassigning {} persistent tasks", tasks.tasks().size());
// We need to check if removed nodes were running any of the tasks and reassign them
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
for (PersistentTask<?> task : tasks.tasks()) {
if (task.needsReassignment(nodes)) {
// there is an unassigned task - we need to try assigning it
Assignment assignment = decider.getAssignment(task.getAction(), clusterState, task.getRequest());
@ -316,14 +315,14 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
return clusterState;
}
private static PersistentTasksInProgress.Builder builder(ClusterState currentState) {
return PersistentTasksInProgress.builder(currentState.getMetaData().custom(PersistentTasksInProgress.TYPE));
private static PersistentTasks.Builder builder(ClusterState currentState) {
return PersistentTasks.builder(currentState.getMetaData().custom(PersistentTasks.TYPE));
}
private static ClusterState update(ClusterState currentState, PersistentTasksInProgress.Builder tasksInProgress) {
private static ClusterState update(ClusterState currentState, PersistentTasks.Builder tasksInProgress) {
if (tasksInProgress.isChanged()) {
return ClusterState.builder(currentState).metaData(
MetaData.builder(currentState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasksInProgress.build())
MetaData.builder(currentState.metaData()).putCustom(PersistentTasks.TYPE, tasksInProgress.build())
).build();
} else {
return currentState;

View File

@ -43,25 +43,25 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru
/**
* A cluster state record that contains a list of all running persistent tasks
*/
public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaData.Custom> implements MetaData.Custom {
public final class PersistentTasks extends AbstractNamedDiffable<MetaData.Custom> implements MetaData.Custom {
public static final String TYPE = "persistent_tasks";
private static final String API_CONTEXT = MetaData.XContentContext.API.toString();
// TODO: Implement custom Diff for tasks
private final Map<Long, PersistentTaskInProgress<?>> tasks;
private final Map<Long, PersistentTask<?>> tasks;
private final long currentId;
public PersistentTasksInProgress(long currentId, Map<Long, PersistentTaskInProgress<?>> tasks) {
public PersistentTasks(long currentId, Map<Long, PersistentTask<?>> tasks) {
this.currentId = currentId;
this.tasks = tasks;
}
private static final ObjectParser<Builder, Void> PERSISTENT_TASKS_IN_PROGRESS_PARSER = new ObjectParser<>(TYPE, Builder::new);
private static final ObjectParser<Builder, Void> PERSISTENT_TASKS_PARSER = new ObjectParser<>(TYPE, Builder::new);
private static final ObjectParser<TaskBuilder<PersistentActionRequest>, Void> PERSISTENT_TASK_IN_PROGRESS_PARSER =
new ObjectParser<>("running_tasks", TaskBuilder::new);
private static final ObjectParser<TaskBuilder<PersistentActionRequest>, Void> PERSISTENT_TASK_PARSER =
new ObjectParser<>("tasks", TaskBuilder::new);
public static final ConstructingObjectParser<Assignment, Void> ASSIGNMENT_PARSER =
new ConstructingObjectParser<>("assignment", objects -> new Assignment((String) objects[0], (String) objects[1]));
@ -73,22 +73,20 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
static {
// Tasks parser initialization
PERSISTENT_TASKS_IN_PROGRESS_PARSER.declareLong(Builder::setCurrentId, new ParseField("current_id"));
PERSISTENT_TASKS_IN_PROGRESS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_IN_PROGRESS_PARSER,
new ParseField("running_tasks"));
PERSISTENT_TASKS_PARSER.declareLong(Builder::setCurrentId, new ParseField("current_id"));
PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks"));
// Assignment parser
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("executor_node"));
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation"));
// Task parser initialization
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setId, new ParseField("id"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareString(TaskBuilder::setAction, new ParseField("action"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareBoolean(TaskBuilder::setRemoveOnCompletion, new ParseField("remove_on_completion"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareBoolean(TaskBuilder::setStopped, new ParseField("stopped"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareNamedObjects(
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setId, new ParseField("id"));
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setAction, new ParseField("action"));
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
PERSISTENT_TASK_PARSER.declareBoolean(TaskBuilder::setRemoveOnCompletion, new ParseField("remove_on_completion"));
PERSISTENT_TASK_PARSER.declareBoolean(TaskBuilder::setStopped, new ParseField("stopped"));
PERSISTENT_TASK_PARSER.declareNamedObjects(
(TaskBuilder<PersistentActionRequest> taskBuilder, List<PersistentActionRequest> objects) -> {
if (objects.size() != 1) {
throw new IllegalArgumentException("only one action request per task is allowed");
@ -96,7 +94,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
taskBuilder.setRequest(objects.get(0));
}, REQUEST_PARSER, new ParseField("request"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareNamedObjects(
PERSISTENT_TASK_PARSER.declareNamedObjects(
(TaskBuilder<PersistentActionRequest> taskBuilder, List<Status> objects) -> {
if (objects.size() != 1) {
throw new IllegalArgumentException("only one status per task is allowed");
@ -105,31 +103,31 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
}, STATUS_PARSER, new ParseField("status"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate,
PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment"));
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate,
new ParseField("allocation_id_on_last_status_update"));
}
public Collection<PersistentTaskInProgress<?>> tasks() {
public Collection<PersistentTask<?>> tasks() {
return this.tasks.values();
}
public Map<Long, PersistentTaskInProgress<?>> taskMap() {
public Map<Long, PersistentTask<?>> taskMap() {
return this.tasks;
}
public PersistentTaskInProgress<?> getTask(long id) {
public PersistentTask<?> getTask(long id) {
return this.tasks.get(id);
}
public Collection<PersistentTaskInProgress<?>> findTasks(String actionName, Predicate<PersistentTaskInProgress<?>> predicate) {
public Collection<PersistentTask<?>> findTasks(String actionName, Predicate<PersistentTask<?>> predicate) {
return this.tasks().stream()
.filter(p -> actionName.equals(p.getAction()))
.filter(predicate)
.collect(Collectors.toList());
}
public boolean tasksExist(String actionName, Predicate<PersistentTaskInProgress<?>> predicate) {
public boolean tasksExist(String actionName, Predicate<PersistentTask<?>> predicate) {
return this.tasks().stream()
.filter(p -> actionName.equals(p.getAction()))
.anyMatch(predicate);
@ -139,7 +137,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersistentTasksInProgress that = (PersistentTasksInProgress) o;
PersistentTasks that = (PersistentTasks) o;
return currentId == that.currentId &&
Objects.equals(tasks, that.tasks);
}
@ -168,8 +166,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
return ALL_CONTEXTS;
}
public static PersistentTasksInProgress fromXContent(XContentParser parser) throws IOException {
return PERSISTENT_TASKS_IN_PROGRESS_PARSER.parse(parser, null).build();
public static PersistentTasks fromXContent(XContentParser parser) throws IOException {
return PERSISTENT_TASKS_PARSER.parse(parser, null).build();
}
public static class Assignment {
@ -223,7 +221,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
/**
* A record that represents a single running persistent task
*/
public static class PersistentTaskInProgress<Request extends PersistentActionRequest> implements Writeable, ToXContent {
public static class PersistentTask<Request extends PersistentActionRequest> implements Writeable, ToXContent {
private final long id;
private final long allocationId;
private final String action;
@ -237,24 +235,22 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
private final Long allocationIdOnLastStatusUpdate;
public PersistentTaskInProgress(long id, String action, Request request, boolean stopped, boolean removeOnCompletion,
Assignment assignment) {
public PersistentTask(long id, String action, Request request, boolean stopped, boolean removeOnCompletion, Assignment assignment) {
this(id, 0L, action, request, stopped, removeOnCompletion, null, assignment, null);
}
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, boolean stopped, Assignment assignment) {
public PersistentTask(PersistentTask<Request> task, boolean stopped, Assignment assignment) {
this(task.id, task.allocationId + 1L, task.action, task.request, stopped, task.removeOnCompletion, task.status,
assignment, task.allocationId);
}
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, Status status) {
public PersistentTask(PersistentTask<Request> task, Status status) {
this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status,
task.assignment, task.allocationId);
}
private PersistentTaskInProgress(long id, long allocationId, String action, Request request,
boolean stopped, boolean removeOnCompletion, Status status,
Assignment assignment, Long allocationIdOnLastStatusUpdate) {
private PersistentTask(long id, long allocationId, String action, Request request, boolean stopped, boolean removeOnCompletion,
Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) {
this.id = id;
this.allocationId = allocationId;
this.action = action;
@ -269,7 +265,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
}
@SuppressWarnings("unchecked")
private PersistentTaskInProgress(StreamInput in) throws IOException {
private PersistentTask(StreamInput in) throws IOException {
id = in.readLong();
allocationId = in.readLong();
action = in.readString();
@ -299,7 +295,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersistentTaskInProgress<?> that = (PersistentTaskInProgress<?>) o;
PersistentTask<?> that = (PersistentTask<?>) o;
return id == that.id &&
allocationId == that.allocationId &&
Objects.equals(action, that.action) &&
@ -481,8 +477,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
return this;
}
public PersistentTaskInProgress<Request> build() {
return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status,
public PersistentTask<Request> build() {
return new PersistentTask<>(id, allocationId, action, request, stopped, removeOnCompletion, status,
assignment, allocationIdOnLastStatusUpdate);
}
}
@ -492,9 +488,9 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
return TYPE;
}
public PersistentTasksInProgress(StreamInput in) throws IOException {
public PersistentTasks(StreamInput in) throws IOException {
currentId = in.readLong();
tasks = in.readMap(StreamInput::readLong, PersistentTaskInProgress::new);
tasks = in.readMap(StreamInput::readLong, PersistentTask::new);
}
@Override
@ -517,8 +513,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field("current_id", currentId);
builder.startArray("running_tasks");
for (PersistentTaskInProgress<?> entry : tasks.values()) {
builder.startArray("tasks");
for (PersistentTask<?> entry : tasks.values()) {
entry.toXContent(builder, params);
}
builder.endArray();
@ -529,19 +525,19 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
return new Builder();
}
public static Builder builder(PersistentTasksInProgress tasks) {
public static Builder builder(PersistentTasks tasks) {
return new Builder(tasks);
}
public static class Builder {
private final Map<Long, PersistentTaskInProgress<?>> tasks = new HashMap<>();
private final Map<Long, PersistentTask<?>> tasks = new HashMap<>();
private long currentId;
private boolean changed;
public Builder() {
}
public Builder(PersistentTasksInProgress tasksInProgress) {
public Builder(PersistentTasks tasksInProgress) {
if (tasksInProgress != null) {
tasks.putAll(tasksInProgress.tasks);
currentId = tasksInProgress.currentId;
@ -557,7 +553,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
private <Request extends PersistentActionRequest> Builder setTasks(List<TaskBuilder<Request>> tasks) {
for (TaskBuilder builder : tasks) {
PersistentTaskInProgress<?> task = builder.build();
PersistentTask<?> task = builder.build();
this.tasks.put(task.getId(), task);
}
return this;
@ -572,7 +568,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
boolean removeOnCompletion, Assignment assignment) {
changed = true;
currentId++;
tasks.put(currentId, new PersistentTaskInProgress<>(currentId, action, request, stopped, removeOnCompletion, assignment));
tasks.put(currentId, new PersistentTask<>(currentId, action, request, stopped, removeOnCompletion, assignment));
return this;
}
@ -580,10 +576,10 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
* Reassigns the task to another node if the task exist
*/
public Builder reassignTask(long taskId, Assignment assignment) {
PersistentTaskInProgress<?> taskInProgress = tasks.get(taskId);
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment));
tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment));
}
return this;
}
@ -597,12 +593,12 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
@SuppressWarnings("unchecked")
public <Request extends PersistentActionRequest> Builder assignTask(long taskId,
BiFunction<String, Request, Assignment> executorNodeFunc) {
PersistentTaskInProgress<Request> taskInProgress = (PersistentTaskInProgress<Request>) tasks.get(taskId);
PersistentTask<Request> taskInProgress = (PersistentTask<Request>) tasks.get(taskId);
if (taskInProgress != null && taskInProgress.assignment.isAssigned() == false) { // only assign unassigned tasks
Assignment assignment = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request);
if (assignment.isAssigned() || taskInProgress.isStopped()) {
changed = true;
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment));
tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment));
}
}
return this;
@ -614,11 +610,11 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
@SuppressWarnings("unchecked")
public <Request extends PersistentActionRequest> Builder reassignTask(long taskId,
BiFunction<String, Request, Assignment> executorNodeFunc) {
PersistentTaskInProgress<Request> taskInProgress = (PersistentTaskInProgress<Request>) tasks.get(taskId);
PersistentTask<Request> taskInProgress = (PersistentTask<Request>) tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
Assignment assignment = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request);
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment));
tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment));
}
return this;
}
@ -627,10 +623,10 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
* Updates the task status if the task exist
*/
public Builder updateTaskStatus(long taskId, Status status) {
PersistentTaskInProgress<?> taskInProgress = tasks.get(taskId);
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, status));
tasks.put(taskId, new PersistentTask<>(taskInProgress, status));
}
return this;
}
@ -651,13 +647,13 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
* If the task is marked with removeOnCompletion flag, it is removed from the list, otherwise it is stopped.
*/
public Builder finishTask(long taskId) {
PersistentTaskInProgress<?> taskInProgress = tasks.get(taskId);
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
if (taskInProgress.removeOnCompletion) {
tasks.remove(taskId);
} else {
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, true, FINISHED_TASK_ASSIGNMENT));
tasks.put(taskId, new PersistentTask<>(taskInProgress, true, FINISHED_TASK_ASSIGNMENT));
}
}
return this;
@ -684,8 +680,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
return changed;
}
public PersistentTasksInProgress build() {
return new PersistentTasksInProgress(currentId, Collections.unmodifiableMap(tasks));
public PersistentTasks build() {
return new PersistentTasks(currentId, Collections.unmodifiableMap(tasks));
}
}
}

View File

@ -16,7 +16,7 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
import java.util.function.Predicate;
import java.util.function.Supplier;
@ -66,14 +66,14 @@ public abstract class TransportPersistentAction<Request extends PersistentAction
protected DiscoveryNode selectLeastLoadedNode(ClusterState clusterState, Predicate<DiscoveryNode> selector) {
long minLoad = Long.MAX_VALUE;
DiscoveryNode minLoadedNode = null;
PersistentTasksInProgress persistentTasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks persistentTasks = clusterState.getMetaData().custom(PersistentTasks.TYPE);
for (DiscoveryNode node : clusterState.getNodes()) {
if (selector.test(node)) {
if (persistentTasksInProgress == null) {
if (persistentTasks == null) {
// We don't have any task running yet, pick the first available node
return node;
}
long numberOfTasks = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), actionName);
long numberOfTasks = persistentTasks.getNumberOfTasksOnNode(node.getId(), actionName);
if (minLoad > numberOfTasks) {
minLoad = numberOfTasks;
minLoadedNode = node;
@ -103,7 +103,7 @@ public abstract class TransportPersistentAction<Request extends PersistentAction
* The status can be used to store the current progress of the task or provide an insight for the
* task allocator about the state of the currently running tasks.
*/
protected void updatePersistentTaskStatus(PersistentTask task, Task.Status status, ActionListener<Empty> listener) {
protected void updatePersistentTaskStatus(NodePersistentTask task, Task.Status status, ActionListener<Empty> listener) {
persistentActionService.updateStatus(task.getPersistentTaskId(), status,
new ActionListener<UpdatePersistentTaskStatusAction.Response>() {
@Override
@ -125,7 +125,7 @@ public abstract class TransportPersistentAction<Request extends PersistentAction
* possibly on a different node. If listener.onResponse() is called, the task is considered to be successfully
* completed and will be removed from the cluster state and not restarted.
*/
protected abstract void nodeOperation(PersistentTask task, Request request, ActionListener<Empty> listener);
protected abstract void nodeOperation(NodePersistentTask task, Request request, ActionListener<Empty> listener);
public String getExecutor() {
return executor;

View File

@ -16,7 +16,7 @@
* {@link org.elasticsearch.xpack.persistent.PersistentTaskClusterService} to update cluster state with the record about running persistent
* task.
* <p>
* 2. The master node updates the {@link org.elasticsearch.xpack.persistent.PersistentTasksInProgress} in the cluster state to indicate that
* 2. The master node updates the {@link org.elasticsearch.xpack.persistent.PersistentTasks} in the cluster state to indicate that
* there is a new persistent action
* running in the system.
* <p>
@ -24,11 +24,11 @@
* the cluster state and starts execution of all new actions assigned to the node it is running on.
* <p>
* 4. If the action fails to start on the node, the {@link org.elasticsearch.xpack.persistent.PersistentActionCoordinator} uses the
* {@link org.elasticsearch.xpack.persistent.PersistentTasksInProgress} to notify the
* {@link org.elasticsearch.xpack.persistent.PersistentTasks} to notify the
* {@link org.elasticsearch.xpack.persistent.PersistentActionService}, which reassigns the action to another node in the cluster.
* <p>
* 5. If action finishes successfully on the node and calls listener.onResponse(), the corresponding persistent action is removed from the
* cluster state.
* cluster state unless .
* <p>
* 6. The {@link org.elasticsearch.xpack.persistent.RemovePersistentTaskAction} action can be also used to remove the persistent action.
*/

View File

@ -25,8 +25,8 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTests;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.io.IOException;
import java.util.Collections;
@ -35,7 +35,7 @@ import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.elasticsearch.xpack.persistent.PersistentTasksInProgress.INITIAL_ASSIGNMENT;
import static org.elasticsearch.xpack.persistent.PersistentTasks.INITIAL_ASSIGNMENT;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
@ -133,7 +133,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getDatafeeds().get("1"), nullValue());
builder.deleteJob("1", new PersistentTasksInProgress(0L, Collections.emptyMap()));
builder.deleteJob("1", new PersistentTasks(0L, Collections.emptyMap()));
result = builder.build();
assertThat(result.getJobs().get("1"), nullValue());
assertThat(result.getDatafeeds().get("1"), nullValue());
@ -148,10 +148,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getDatafeeds().get("1"), nullValue());
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(0L, "1", null, JobState.CLOSED);
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, "1", null, JobState.CLOSED);
MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> builder2.deleteJob("1", new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task))));
() -> builder2.deleteJob("1", new PersistentTasks(0L, Collections.singletonMap(0L, task))));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
}
@ -163,7 +163,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
builder.putDatafeed(datafeedConfig1);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> builder.deleteJob(job1.getId(), new PersistentTasksInProgress(0L, Collections.emptyMap())));
() -> builder.deleteJob(job1.getId(), new PersistentTasks(0L, Collections.emptyMap())));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
String expectedMsg = "Cannot delete job [" + job1.getId() + "] while datafeed [" + datafeedConfig1.getId() + "] refers to it";
assertThat(e.getMessage(), equalTo(expectedMsg));
@ -172,7 +172,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
public void testRemoveJob_failBecauseJobDoesNotExist() {
MlMetadata.Builder builder1 = new MlMetadata.Builder();
expectThrows(ResourceNotFoundException.class,
() -> builder1.deleteJob("1", new PersistentTasksInProgress(0L, Collections.emptyMap())));
() -> builder1.deleteJob("1", new PersistentTasks(0L, Collections.emptyMap())));
}
public void testCrudDatafeed() {
@ -187,7 +187,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1));
builder = new MlMetadata.Builder(result);
builder.removeDatafeed("datafeed1", new PersistentTasksInProgress(0, Collections.emptyMap()));
builder.removeDatafeed("datafeed1", new PersistentTasks(0, Collections.emptyMap()));
result = builder.build();
assertThat(result.getJobs().get("job_id"), sameInstance(job1));
assertThat(result.getDatafeeds().get("datafeed1"), nullValue());
@ -269,10 +269,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
MlMetadata beforeMetadata = builder.build();
StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L);
PersistentTaskInProgress<StartDatafeedAction.Request> taskInProgress =
new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT);
PersistentTasksInProgress tasksInProgress =
new PersistentTasksInProgress(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
PersistentTask<StartDatafeedAction.Request> taskInProgress =
new PersistentTask<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT);
PersistentTasks tasksInProgress =
new PersistentTasks(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId());
update.setScrollSize(5000);
@ -331,10 +331,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1));
StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L);
PersistentTaskInProgress<StartDatafeedAction.Request> taskInProgress =
new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT);
PersistentTasksInProgress tasksInProgress =
new PersistentTasksInProgress(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
PersistentTask<StartDatafeedAction.Request> taskInProgress =
new PersistentTask<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT);
PersistentTasks tasksInProgress =
new PersistentTasks(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,

View File

@ -13,8 +13,8 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.util.Collections;
@ -26,14 +26,14 @@ public class CloseJobActionTests extends ESTestCase {
public void testMoveJobToClosingState() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTaskInProgress<OpenJobAction.Request> task =
PersistentTask<OpenJobAction.Request> task =
createJobTask(1L, "job_id", null, randomFrom(JobState.OPENED, JobState.FAILED));
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task))));
.putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.singletonMap(1L, task))));
ClusterState result = CloseJobAction.moveJobToClosingState("job_id", csBuilder.build());
PersistentTasksInProgress actualTasks = result.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks actualTasks = result.getMetaData().custom(PersistentTasks.TYPE);
assertEquals(JobState.CLOSING, actualTasks.getTask(1L).getStatus());
MlMetadata actualMetadata = result.metaData().custom(MlMetadata.TYPE);
@ -44,24 +44,24 @@ public class CloseJobActionTests extends ESTestCase {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.emptyMap())));
.putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.emptyMap())));
expectThrows(ResourceNotFoundException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder.build()));
}
public void testMoveJobToClosingState_unexpectedJobState() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(1L, "job_id", null, JobState.OPENING);
PersistentTask<OpenJobAction.Request> task = createJobTask(1L, "job_id", null, JobState.OPENING);
ClusterState.Builder csBuilder1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task))));
.putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.singletonMap(1L, task))));
ElasticsearchStatusException result =
expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder1.build()));
assertEquals("cannot close job, expected job state [opened], but got [opening]", result.getMessage());
ClusterState.Builder csBuilder2 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.emptyMap())));
.putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.emptyMap())));
result = expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder2.build()));
assertEquals("cannot close job, expected job state [opened], but got [closed]", result.getMessage());
}

View File

@ -32,9 +32,9 @@ import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.net.InetAddress;
import java.util.ArrayList;
@ -56,16 +56,16 @@ public class OpenJobActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTaskInProgress<OpenJobAction.Request> task =
PersistentTask<OpenJobAction.Request> task =
createJobTask(1L, "job_id", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED));
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(1L, task));
OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes);
OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksInProgress(1L, Collections.emptyMap()), nodes);
OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasks(1L, Collections.emptyMap()), nodes);
OpenJobAction.validate("job_id", mlBuilder.build(), null, nodes);
task = createJobTask(1L, "job_id", "_other_node_id", JobState.OPENED);
tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
tasks = new PersistentTasks(1L, Collections.singletonMap(1L, task));
OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes);
}
@ -94,8 +94,8 @@ public class OpenJobActionTests extends ESTestCase {
.build();
JobState jobState = randomFrom(JobState.OPENING, JobState.OPENED, JobState.CLOSING);
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(1L, "job_id", "_node_id", jobState);
PersistentTasksInProgress tasks1 = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
PersistentTask<OpenJobAction.Request> task = createJobTask(1L, "job_id", "_node_id", jobState);
PersistentTasks tasks1 = new PersistentTasks(1L, Collections.singletonMap(1L, task));
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1, nodes));
@ -103,7 +103,7 @@ public class OpenJobActionTests extends ESTestCase {
jobState = randomFrom(JobState.OPENING, JobState.CLOSING);
task = createJobTask(1L, "job_id", "_other_node_id", jobState);
PersistentTasksInProgress tasks2 = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
PersistentTasks tasks2 = new PersistentTasks(1L, Collections.singletonMap(1L, task));
e = expectThrows(ElasticsearchStatusException.class,
() -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks2, nodes));
@ -122,21 +122,21 @@ public class OpenJobActionTests extends ESTestCase {
nodeAttr, Collections.emptySet(), Version.CURRENT))
.build();
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
taskMap.put(0L, new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true,
Map<Long, PersistentTask<?>> taskMap = new HashMap<>();
taskMap.put(0L, new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true,
new Assignment("_node_id1", "test assignment")));
taskMap.put(1L, new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), false, true,
taskMap.put(1L, new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), false, true,
new Assignment("_node_id1", "test assignment")));
taskMap.put(2L, new PersistentTaskInProgress<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), false, true,
taskMap.put(2L, new PersistentTask<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), false, true,
new Assignment("_node_id2", "test assignment")));
PersistentTasksInProgress tasks = new PersistentTasksInProgress(3L, taskMap);
PersistentTasks tasks = new PersistentTasks(3L, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4");
cs.nodes(nodes);
metaData.putCustom(PersistentTasksInProgress.TYPE, tasks);
metaData.putCustom(PersistentTasks.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, logger);
@ -150,7 +150,7 @@ public class OpenJobActionTests extends ESTestCase {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode));
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
Map<Long, PersistentTask<?>> taskMap = new HashMap<>();
for (int i = 0; i < numNodes; i++) {
String nodeId = "_node_id" + i;
TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300 + i);
@ -160,14 +160,14 @@ public class OpenJobActionTests extends ESTestCase {
taskMap.put(id, createJobTask(id, "job_id" + id, nodeId, JobState.OPENED));
}
}
PersistentTasksInProgress tasks = new PersistentTasksInProgress(numNodes * maxRunningJobsPerNode, taskMap);
PersistentTasks tasks = new PersistentTasks(numNodes * maxRunningJobsPerNode, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2");
cs.nodes(nodes);
metaData.putCustom(PersistentTasksInProgress.TYPE, tasks);
metaData.putCustom(PersistentTasks.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
@ -184,17 +184,17 @@ public class OpenJobActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true,
PersistentTask<OpenJobAction.Request> task =
new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true,
new Assignment("_node_id1", "test assignment"));
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(1L, task));
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2");
cs.nodes(nodes);
metaData.putCustom(PersistentTasksInProgress.TYPE, tasks);
metaData.putCustom(PersistentTasks.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
@ -214,13 +214,13 @@ public class OpenJobActionTests extends ESTestCase {
nodeAttr, Collections.emptySet(), Version.CURRENT))
.build();
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
Map<Long, PersistentTask<?>> taskMap = new HashMap<>();
taskMap.put(0L, createJobTask(0L, "job_id1", "_node_id1", JobState.OPENING));
taskMap.put(1L, createJobTask(1L, "job_id2", "_node_id1", JobState.OPENING));
taskMap.put(2L, createJobTask(2L, "job_id3", "_node_id2", JobState.OPENING));
taskMap.put(3L, createJobTask(3L, "job_id4", "_node_id2", JobState.OPENING));
taskMap.put(4L, createJobTask(4L, "job_id5", "_node_id3", JobState.OPENING));
PersistentTasksInProgress tasks = new PersistentTasksInProgress(5L, taskMap);
PersistentTasks tasks = new PersistentTasks(5L, taskMap);
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
csBuilder.nodes(nodes);
@ -228,39 +228,39 @@ public class OpenJobActionTests extends ESTestCase {
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5", "job_id6", "job_id7");
csBuilder.routingTable(routingTable.build());
metaData.putCustom(PersistentTasksInProgress.TYPE, tasks);
metaData.putCustom(PersistentTasks.TYPE, tasks);
csBuilder.metaData(metaData);
ClusterState cs = csBuilder.build();
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger);
assertEquals("_node_id3", result.getExecutorNode());
PersistentTaskInProgress<OpenJobAction.Request> lastTask = createJobTask(5L, "job_id6", "_node_id3", JobState.OPENING);
PersistentTask<OpenJobAction.Request> lastTask = createJobTask(5L, "job_id6", "_node_id3", JobState.OPENING);
taskMap.put(5L, lastTask);
tasks = new PersistentTasksInProgress(6L, taskMap);
tasks = new PersistentTasks(6L, taskMap);
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks));
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasks.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
assertNull("no node selected, because OPENING state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, false, new Assignment("_node_id3", "test assignment")));
tasks = new PersistentTasksInProgress(6L, taskMap);
taskMap.put(5L, new PersistentTask<>(lastTask, false, new Assignment("_node_id3", "test assignment")));
tasks = new PersistentTasks(6L, taskMap);
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks));
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasks.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
assertNull("no node selected, because stale task", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, null));
tasks = new PersistentTasksInProgress(6L, taskMap);
taskMap.put(5L, new PersistentTask<>(lastTask, null));
tasks = new PersistentTasks(6L, taskMap);
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks));
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasks.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
assertNull("no node selected, because null state", result.getExecutorNode());
@ -304,11 +304,11 @@ public class OpenJobActionTests extends ESTestCase {
assertEquals(indexToRemove, result.get(0));
}
public static PersistentTaskInProgress<OpenJobAction.Request> createJobTask(long id, String jobId, String nodeId, JobState jobState) {
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true,
public static PersistentTask<OpenJobAction.Request> createJobTask(long id, String jobId, String nodeId, JobState jobState) {
PersistentTask<OpenJobAction.Request> task =
new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true,
new Assignment(nodeId, "test assignment"));
task = new PersistentTaskInProgress<>(task, jobState);
task = new PersistentTask<>(task, jobState);
return task;
}

View File

@ -21,9 +21,9 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.net.InetAddress;
import java.util.Collections;
@ -33,7 +33,7 @@ import java.util.Map;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
import static org.elasticsearch.xpack.persistent.PersistentTasksInProgress.INITIAL_ASSIGNMENT;
import static org.elasticsearch.xpack.persistent.PersistentTasks.INITIAL_ASSIGNMENT;
import static org.hamcrest.Matchers.equalTo;
public class StartDatafeedActionTests extends ESTestCase {
@ -45,8 +45,8 @@ public class StartDatafeedActionTests extends ESTestCase {
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*")));
JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED, JobState.CLOSING, JobState.OPENING);
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", jobState);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", jobState);
PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(0L, task));
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
@ -55,7 +55,7 @@ public class StartDatafeedActionTests extends ESTestCase {
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
.putCustom(PersistentTasksInProgress.TYPE, tasks))
.putCustom(PersistentTasks.TYPE, tasks))
.nodes(nodes);
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
@ -64,10 +64,10 @@ public class StartDatafeedActionTests extends ESTestCase {
"] while state [opened] is required", result.getExplanation());
task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED);
tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
tasks = new PersistentTasks(1L, Collections.singletonMap(0L, task));
cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
.putCustom(PersistentTasksInProgress.TYPE, tasks))
.putCustom(PersistentTasks.TYPE, tasks))
.nodes(nodes);
result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertEquals("node_id", result.getExecutorNode());
@ -80,8 +80,8 @@ public class StartDatafeedActionTests extends ESTestCase {
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*")));
String nodeId = randomBoolean() ? "node_id2" : null;
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(0L, job.getId(), nodeId, JobState.OPENED);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), nodeId, JobState.OPENED);
PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(0L, task));
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
@ -90,7 +90,7 @@ public class StartDatafeedActionTests extends ESTestCase {
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
.putCustom(PersistentTasksInProgress.TYPE, tasks))
.putCustom(PersistentTasks.TYPE, tasks))
.nodes(nodes);
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
@ -99,10 +99,10 @@ public class StartDatafeedActionTests extends ESTestCase {
result.getExplanation());
task = createJobTask(0L, job.getId(), "node_id1", JobState.OPENED);
tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
tasks = new PersistentTasks(1L, Collections.singletonMap(0L, task));
cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
.putCustom(PersistentTasksInProgress.TYPE, tasks))
.putCustom(PersistentTasks.TYPE, tasks))
.nodes(nodes);
result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertEquals("node_id1", result.getExecutorNode());
@ -123,10 +123,10 @@ public class StartDatafeedActionTests extends ESTestCase {
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false)
.build();
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true,
PersistentTask<OpenJobAction.Request> task =
new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true,
INITIAL_ASSIGNMENT);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task));
PersistentTasks tasks = new PersistentTasks(0L, Collections.singletonMap(0L, task));
DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1)
.putDatafeed(datafeedConfig1)
@ -148,15 +148,15 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTaskInProgress<OpenJobAction.Request> jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED);
PersistentTaskInProgress<StartDatafeedAction.Request> datafeedTask =
new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
PersistentTask<OpenJobAction.Request> jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED);
PersistentTask<StartDatafeedAction.Request> datafeedTask =
new PersistentTask<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
false, true, new Assignment("node_id", "test assignment"));
datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED);
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED);
Map<Long, PersistentTask<?>> taskMap = new HashMap<>();
taskMap.put(0L, jobTask);
taskMap.put(1L, datafeedTask);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(2L, taskMap);
PersistentTasks tasks = new PersistentTasks(2L, taskMap);
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes));
@ -176,20 +176,20 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTaskInProgress<OpenJobAction.Request> jobTask = createJobTask(0L, "job_id", "node_id2", JobState.OPENED);
PersistentTaskInProgress<StartDatafeedAction.Request> datafeedTask =
new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
PersistentTask<OpenJobAction.Request> jobTask = createJobTask(0L, "job_id", "node_id2", JobState.OPENED);
PersistentTask<StartDatafeedAction.Request> datafeedTask =
new PersistentTask<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
false, true, new Assignment("node_id1", "test assignment"));
datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED);
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED);
Map<Long, PersistentTask<?>> taskMap = new HashMap<>();
taskMap.put(0L, jobTask);
taskMap.put(1L, datafeedTask);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(2L, taskMap);
PersistentTasks tasks = new PersistentTasks(2L, taskMap);
StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes);
datafeedTask = new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
datafeedTask = new PersistentTask<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
false, true, INITIAL_ASSIGNMENT);
datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED);
datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED);
taskMap.put(1L, datafeedTask);
StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes);
}

View File

@ -16,8 +16,8 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.util.Collections;
@ -47,10 +47,10 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
}
public void testValidate() {
PersistentTaskInProgress<?> task = new PersistentTaskInProgress<PersistentActionRequest>(1L, StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasksInProgress.Assignment("node_id", ""));
task = new PersistentTaskInProgress<>(task, DatafeedState.STARTED);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
PersistentTask<?> task = new PersistentTask<PersistentActionRequest>(1L, StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasks.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED);
PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(1L, task));
Job job = createDatafeedJob().build();
MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build();
@ -66,14 +66,14 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
}
public void testValidate_alreadyStopped() {
PersistentTasksInProgress tasks;
PersistentTasks tasks;
if (randomBoolean()) {
PersistentTaskInProgress<?> task = new PersistentTaskInProgress<PersistentActionRequest>(1L, StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasksInProgress.Assignment("node_id", ""));
task = new PersistentTaskInProgress<>(task, DatafeedState.STOPPED);
tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
PersistentTask<?> task = new PersistentTask<PersistentActionRequest>(1L, StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasks.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STOPPED);
tasks = new PersistentTasks(1L, Collections.singletonMap(1L, task));
} else {
tasks = randomBoolean() ? null : new PersistentTasksInProgress(0L, Collections.emptyMap());
tasks = randomBoolean() ? null : new PersistentTasks(0L, Collections.emptyMap());
}
Job job = createDatafeedJob().build();

View File

@ -40,8 +40,8 @@ import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction.Response;
import org.junit.Before;
@ -91,15 +91,15 @@ public class DatafeedJobRunnerTests extends ESTestCase {
Job job = createDatafeedJob().build();
mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeedConfig("datafeed_id", job.getId()).build());
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED);
PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(0L, task));
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
.putCustom(PersistentTasksInProgress.TYPE, tasks))
.putCustom(PersistentTasks.TYPE, tasks))
.nodes(nodes);
clusterService = mock(ClusterService.class);

View File

@ -25,8 +25,8 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.io.IOException;
import java.util.Collection;
@ -164,8 +164,8 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE);
PersistentTask task = tasks.taskMap().values().iterator().next();
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
Map<String, String> expectedNodeAttr = new HashMap<>();
@ -215,13 +215,13 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
// Sample each cs update and keep track each time a node holds more than `maxConcurrentJobAllocations` opening jobs.
List<String> violations = new CopyOnWriteArrayList<>();
internalCluster().clusterService(nonMlNode).addListener(event -> {
PersistentTasksInProgress tasks = event.state().metaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = event.state().metaData().custom(PersistentTasks.TYPE);
if (tasks == null) {
return;
}
for (DiscoveryNode node : event.state().nodes()) {
Collection<PersistentTaskInProgress<?>> foundTasks = tasks.findTasks(OpenJobAction.NAME, task -> {
Collection<PersistentTask<?>> foundTasks = tasks.findTasks(OpenJobAction.NAME, task -> {
return node.getId().equals(task.getExecutorNode()) &&
(task.getStatus() == null || task.getStatus() == JobState.OPENING || task.isCurrentStatus() == false);
});
@ -246,9 +246,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE);
assertEquals(numJobs, tasks.taskMap().size());
for (PersistentTaskInProgress<?> task : tasks.taskMap().values()) {
for (PersistentTask<?> task : tasks.taskMap().values()) {
assertNotNull(task.getExecutorNode());
assertEquals(JobState.OPENED, task.getStatus());
}
@ -270,9 +270,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
ensureStableCluster(1, nonMlNode);
assertBusy(() -> {
ClusterState state = client(nonMlNode).admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE);
assertEquals(numJobs, tasks.taskMap().size());
for (PersistentTaskInProgress<?> task : tasks.taskMap().values()) {
for (PersistentTask<?> task : tasks.taskMap().values()) {
assertNull(task.getExecutorNode());
}
});
@ -286,9 +286,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
ensureStableCluster(1 + numMlNodes);
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE);
assertEquals(numJobs, tasks.taskMap().size());
for (PersistentTaskInProgress<?> task : tasks.taskMap().values()) {
for (PersistentTask<?> task : tasks.taskMap().values()) {
assertNotNull(task.getExecutorNode());
assertEquals(JobState.OPENED, task.getStatus());
}
@ -331,7 +331,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
client().execute(CloseJobAction.INSTANCE, closeJobRequest);
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE);
assertEquals(0, tasks.taskMap().size());
});
logger.info("Stop data node");
@ -354,9 +354,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
private void assertJobTask(String jobId, JobState expectedState, boolean hasExecutorNode) {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE);
assertEquals(1, tasks.taskMap().size());
PersistentTaskInProgress<?> task = tasks.findTasks(OpenJobAction.NAME, p -> {
PersistentTask<?> task = tasks.findTasks(OpenJobAction.NAME, p -> {
return p.getRequest() instanceof OpenJobAction.Request &&
jobId.equals(((OpenJobAction.Request) p.getRequest()).getJobId());
}).iterator().next();

View File

@ -24,8 +24,8 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@ -125,10 +125,10 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
disrupt.run();
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.metaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = clusterState.metaData().custom(PersistentTasks.TYPE);
assertNotNull(tasks);
assertEquals(2, tasks.taskMap().size());
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
for (PersistentTask<?> task : tasks.tasks()) {
assertFalse(task.needsReassignment(clusterState.nodes()));
}

View File

@ -17,7 +17,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks;
public class TooManyJobsIT extends BaseMlIntegTestCase {
@ -49,10 +49,10 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request("2")).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.CLOSED);
ClusterState state = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = state.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = state.getMetaData().custom(PersistentTasks.TYPE);
assertEquals(1, tasks.taskMap().size());
// now just double check that the first job is still opened:
PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
PersistentTasks.PersistentTask task = tasks.taskMap().values().iterator().next();
assertEquals(JobState.OPENED, task.getStatus());
OpenJobAction.Request openJobRequest = (OpenJobAction.Request) task.getRequest();
assertEquals("1", openJobRequest.getJobId());

View File

@ -23,7 +23,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction.Response;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest;
import java.io.IOException;
@ -72,7 +72,7 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY))
.build();
PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder();
PersistentTasks.Builder tasks = PersistentTasks.builder();
boolean added = false;
if (nonLocalNodesCount > 0) {
for (int i = 0; i < randomInt(5); i++) {
@ -91,7 +91,7 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
}
MetaData.Builder metaData = MetaData.builder(state.metaData());
metaData.putCustom(PersistentTasksInProgress.TYPE, tasks.build());
metaData.putCustom(PersistentTasks.TYPE, tasks.build());
ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build();
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
@ -288,35 +288,35 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
private <Request extends PersistentActionRequest> ClusterState addTask(ClusterState state, String action, Request request,
String node) {
PersistentTasksInProgress.Builder builder =
PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE,
PersistentTasks.Builder builder =
PersistentTasks.builder(state.getMetaData().custom(PersistentTasks.TYPE));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasks.TYPE,
builder.addTask(action, request, false, true, new Assignment(node, "test assignment")).build())).build();
}
private ClusterState reallocateTask(ClusterState state, long taskId, String node) {
PersistentTasksInProgress.Builder builder =
PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE));
PersistentTasks.Builder builder =
PersistentTasks.builder(state.getMetaData().custom(PersistentTasks.TYPE));
assertTrue(builder.hasTask(taskId));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE,
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasks.TYPE,
builder.reassignTask(taskId, new Assignment(node, "test assignment")).build())).build();
}
private ClusterState removeTask(ClusterState state, long taskId) {
PersistentTasksInProgress.Builder builder =
PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE));
PersistentTasks.Builder builder =
PersistentTasks.builder(state.getMetaData().custom(PersistentTasks.TYPE));
assertTrue(builder.hasTask(taskId));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE,
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasks.TYPE,
builder.removeTask(taskId).build())).build();
}
private class Execution {
private final PersistentActionRequest request;
private final PersistentTask task;
private final NodePersistentTask task;
private final PersistentActionRegistry.PersistentActionHolder<?> holder;
private final ActionListener<Empty> listener;
Execution(PersistentActionRequest request, PersistentTask task, PersistentActionRegistry.PersistentActionHolder<?> holder,
Execution(PersistentActionRequest request, NodePersistentTask task, PersistentActionRegistry.PersistentActionHolder<?> holder,
ActionListener<Empty> listener) {
this.request = request;
this.task = task;
@ -333,7 +333,7 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
}
@Override
public <Request extends PersistentActionRequest> void executeAction(Request request, PersistentTask task,
public <Request extends PersistentActionRequest> void executeAction(Request request, NodePersistentTask task,
PersistentActionRegistry.PersistentActionHolder<Request> holder,
ActionListener<Empty> listener) {
executions.add(new Execution(request, task, holder, listener));

View File

@ -8,7 +8,7 @@ package org.elasticsearch.xpack.persistent;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestPersistentAction;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest;
@ -59,8 +59,8 @@ public class PersistentActionFullRestartIT extends ESIntegTestCase {
}
}
final int numberOfRunningTasks = runningTasks;
PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasksInProgress = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasks.TYPE);
assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks));
if (numberOfRunningTasks > 0) {
@ -76,11 +76,11 @@ public class PersistentActionFullRestartIT extends ESIntegTestCase {
internalCluster().fullRestart();
ensureYellow();
tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksInProgress.TYPE);
tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasks.TYPE);
assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks));
// Check that cluster state is correct
for (int i = 0; i < numberOfTasks; i++) {
PersistentTaskInProgress<?> task = tasksInProgress.getTask(taskIds[i]);
PersistentTask<?> task = tasksInProgress.getTask(taskIds[i]);
assertNotNull(task);
assertThat(task.isStopped(), equalTo(stopped[i]));
}
@ -93,9 +93,9 @@ public class PersistentActionFullRestartIT extends ESIntegTestCase {
});
// Start all other tasks
tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksInProgress.TYPE);
tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasks.TYPE);
for (int i = 0; i < numberOfTasks; i++) {
PersistentTaskInProgress<?> task = tasksInProgress.getTask(taskIds[i]);
PersistentTask<?> task = tasksInProgress.getTask(taskIds[i]);
assertNotNull(task);
logger.info("checking task with id {} stopped {} node {}", task.getId(), task.isStopped(), task.getExecutorNode());
assertThat(task.isStopped(), equalTo(stopped[i]));
@ -119,8 +119,8 @@ public class PersistentActionFullRestartIT extends ESIntegTestCase {
assertBusy(() -> {
// Make sure the task is removed from the cluster state
assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksInProgress.TYPE)).tasks(), empty());
assertThat(((PersistentTasks) internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasks.TYPE)).tasks(), empty());
});
}

View File

@ -112,8 +112,8 @@ public class PersistentActionIT extends ESIntegTestCase {
.setRemoveOnCompletion(false)
.setStopped(stopped).get().getTaskId();
PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasksInProgress = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasks.TYPE);
assertThat(tasksInProgress.tasks().size(), equalTo(1));
assertThat(tasksInProgress.getTask(taskId).isStopped(), equalTo(stopped));
assertThat(tasksInProgress.getTask(taskId).getExecutorNode(), stopped ? nullValue() : notNullValue());
@ -150,8 +150,8 @@ public class PersistentActionIT extends ESIntegTestCase {
assertBusy(() -> {
// Wait for the task to be marked as stopped
PersistentTasksInProgress tasks = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasks.TYPE);
assertThat(tasks.tasks().size(), equalTo(1));
assertThat(tasks.getTask(taskId).isStopped(), equalTo(true));
assertThat(tasks.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false));
@ -202,8 +202,8 @@ public class PersistentActionIT extends ESIntegTestCase {
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]")
.get().getTasks().get(0);
PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasksInProgress = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasks.TYPE);
assertThat(tasksInProgress.tasks().size(), equalTo(1));
assertThat(tasksInProgress.tasks().iterator().next().getStatus(), nullValue());
@ -216,8 +216,8 @@ public class PersistentActionIT extends ESIntegTestCase {
int finalI = i;
assertBusy(() -> {
PersistentTasksInProgress tasks = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasks.TYPE);
assertThat(tasks.tasks().size(), equalTo(1));
assertThat(tasks.tasks().iterator().next().getStatus(), notNullValue());
assertThat(tasks.tasks().iterator().next().getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}"));
@ -258,8 +258,8 @@ public class PersistentActionIT extends ESIntegTestCase {
assertThat(tasks.size(), equalTo(0));
// Make sure the task is removed from the cluster state
assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksInProgress.TYPE)).tasks(), empty());
assertThat(((PersistentTasks) internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasks.TYPE)).tasks(), empty());
});
}

View File

@ -18,8 +18,8 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest;
import java.util.ArrayList;
@ -66,14 +66,14 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
public void testReassignTasksWithNoTasks() {
ClusterState clusterState = initialState();
assertThat(reassign(clusterState).metaData().custom(PersistentTasksInProgress.TYPE), nullValue());
assertThat(reassign(clusterState).metaData().custom(PersistentTasks.TYPE), nullValue());
}
public void testReassignConsidersClusterStateUpdates() {
ClusterState clusterState = initialState();
ClusterState.Builder builder = ClusterState.builder(clusterState);
PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder(
clusterState.metaData().custom(PersistentTasksInProgress.TYPE));
PersistentTasks.Builder tasks = PersistentTasks.builder(
clusterState.metaData().custom(PersistentTasks.TYPE));
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes());
addTestNodes(nodes, randomIntBetween(1, 10));
int numberOfTasks = randomIntBetween(2, 40);
@ -81,11 +81,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits", false);
}
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks.build());
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasks.TYPE, tasks.build());
clusterState = builder.metaData(metaData).nodes(nodes).build();
ClusterState newClusterState = reassign(clusterState);
PersistentTasksInProgress tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasksInProgress = newClusterState.getMetaData().custom(PersistentTasks.TYPE);
assertThat(tasksInProgress, notNullValue());
}
@ -93,8 +93,8 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
public void testReassignTasks() {
ClusterState clusterState = initialState();
ClusterState.Builder builder = ClusterState.builder(clusterState);
PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder(
clusterState.metaData().custom(PersistentTasksInProgress.TYPE));
PersistentTasks.Builder tasks = PersistentTasks.builder(
clusterState.metaData().custom(PersistentTasks.TYPE));
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes());
addTestNodes(nodes, randomIntBetween(1, 10));
int numberOfTasks = randomIntBetween(0, 40);
@ -118,11 +118,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
}
}
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks.build());
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasks.TYPE, tasks.build());
clusterState = builder.metaData(metaData).nodes(nodes).build();
ClusterState newClusterState = reassign(clusterState);
PersistentTasksInProgress tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasksInProgress = newClusterState.getMetaData().custom(PersistentTasks.TYPE);
assertThat(tasksInProgress, notNullValue());
assertThat("number of tasks shouldn't change as a result or reassignment",
@ -130,7 +130,7 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
int assignOneCount = 0;
for (PersistentTaskInProgress<?> task : tasksInProgress.tasks()) {
for (PersistentTask<?> task : tasksInProgress.tasks()) {
if (task.isStopped()) {
assertThat("stopped tasks should be never assigned", task.getExecutorNode(), nullValue());
assertThat(task.getAssignment().getExplanation(), equalTo("explanation: " + task.getAction()));
@ -141,7 +141,7 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
assertThat(task.getExecutorNode(), notNullValue());
assertThat(task.isAssigned(), equalTo(true));
if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) {
logger.info(clusterState.metaData().custom(PersistentTasksInProgress.TYPE).toString());
logger.info(clusterState.metaData().custom(PersistentTasks.TYPE).toString());
}
assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(),
clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true));
@ -203,7 +203,7 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) {
DiscoveryNodes nodes = clusterState.nodes();
PersistentTasksInProgress tasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasksInProgress = clusterState.getMetaData().custom(PersistentTasks.TYPE);
if (tasksInProgress.findTasks("assign_one",
task -> task.isStopped() == false && nodes.nodeExists(task.getExecutorNode())).isEmpty()) {
return randomNodeAssignment(clusterState.nodes());
@ -232,15 +232,15 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
return "nodes_changed: " + event.nodesChanged() +
" nodes_removed:" + event.nodesRemoved() +
" routing_table_changed:" + event.routingTableChanged() +
" tasks: " + event.state().metaData().custom(PersistentTasksInProgress.TYPE);
" tasks: " + event.state().metaData().custom(PersistentTasks.TYPE);
}
private ClusterState significantChange(ClusterState clusterState) {
ClusterState.Builder builder = ClusterState.builder(clusterState);
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE);
if (tasks != null) {
if (randomBoolean()) {
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
for (PersistentTask<?> task : tasks.tasks()) {
if (task.isAssigned() && clusterState.nodes().nodeExists(task.getExecutorNode())) {
logger.info("removed node {}", task.getExecutorNode());
builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(task.getExecutorNode()));
@ -255,11 +255,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
// we don't have any unassigned tasks - add some
if (randomBoolean()) {
logger.info("added random task");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), null, false);
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasks.builder(tasks), null, false);
tasksOrNodesChanged = true;
} else {
logger.info("added unassignable task with custom assignment message");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks),
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasks.builder(tasks),
new Assignment(null, "change me"), "never_assign", false);
tasksOrNodesChanged = true;
}
@ -282,11 +282,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
return builder.build();
}
private PersistentTasksInProgress removeTasksWithChangingAssignment(PersistentTasksInProgress tasks) {
private PersistentTasks removeTasksWithChangingAssignment(PersistentTasks tasks) {
if (tasks != null) {
boolean changed = false;
PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks);
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
PersistentTasks.Builder tasksBuilder = PersistentTasks.builder(tasks);
for (PersistentTask<?> task : tasks.tasks()) {
// Remove all unassigned tasks that cause changing assignments they might trigger a significant change
if ("never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) &&
"change me".equals(task.getAssignment().getExplanation())) {
@ -304,9 +304,9 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
private ClusterState insignificantChange(ClusterState clusterState) {
ClusterState.Builder builder = ClusterState.builder(clusterState);
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE);
tasks = removeTasksWithChangingAssignment(tasks);
PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks);
PersistentTasks.Builder tasksBuilder = PersistentTasks.builder(tasks);
if (randomBoolean()) {
if (hasAssignableTasks(tasks, clusterState.nodes()) == false) {
@ -328,7 +328,7 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
} else {
logger.info("changed routing table");
MetaData.Builder metaData = MetaData.builder(clusterState.metaData());
metaData.putCustom(PersistentTasksInProgress.TYPE, tasksBuilder.build());
metaData.putCustom(PersistentTasks.TYPE, tasksBuilder.build());
RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable());
changeRoutingTable(metaData, routingTable);
builder.metaData(metaData).routingTable(routingTable.build());
@ -350,18 +350,18 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
// clear the task
if (randomBoolean()) {
logger.info("removed all tasks");
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE,
PersistentTasksInProgress.builder().build());
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasks.TYPE,
PersistentTasks.builder().build());
return builder.metaData(metaData).build();
} else {
logger.info("set task custom to null");
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).removeCustom(PersistentTasksInProgress.TYPE);
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).removeCustom(PersistentTasks.TYPE);
return builder.metaData(metaData).build();
}
}
logger.info("removed all unassigned tasks and changed routing table");
if (tasks != null) {
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
for (PersistentTask<?> task : tasks.tasks()) {
if (task.getExecutorNode() == null || "never_assign".equals(((TestRequest) task.getRequest()).getTestParam())) {
tasksBuilder.removeTask(task.getId());
}
@ -374,11 +374,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
.numberOfReplicas(1)
.build();
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).put(indexMetaData, false)
.putCustom(PersistentTasksInProgress.TYPE, tasksBuilder.build());
.putCustom(PersistentTasks.TYPE, tasksBuilder.build());
return builder.metaData(metaData).build();
}
private boolean hasAssignableTasks(PersistentTasksInProgress tasks, DiscoveryNodes discoveryNodes) {
private boolean hasAssignableTasks(PersistentTasks tasks, DiscoveryNodes discoveryNodes) {
if (tasks == null || tasks.tasks().isEmpty()) {
return false;
}
@ -393,26 +393,26 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
});
}
private boolean hasTasksAssignedTo(PersistentTasksInProgress tasks, String nodeId) {
private boolean hasTasksAssignedTo(PersistentTasks tasks, String nodeId) {
return tasks != null && tasks.tasks().stream().anyMatch(
task -> nodeId.equals(task.getExecutorNode())) == false;
}
private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder,
MetaData.Builder metaData, PersistentTasksInProgress.Builder tasks,
MetaData.Builder metaData, PersistentTasks.Builder tasks,
String node, boolean stopped) {
return addRandomTask(clusterStateBuilder, metaData, tasks, new Assignment(node, randomAsciiOfLength(10)),
randomAsciiOfLength(10), stopped);
}
private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder,
MetaData.Builder metaData, PersistentTasksInProgress.Builder tasks,
MetaData.Builder metaData, PersistentTasks.Builder tasks,
Assignment assignment, String param, boolean stopped) {
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksInProgress.TYPE,
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasks.TYPE,
tasks.addTask(randomAsciiOfLength(10), new TestRequest(param), stopped, randomBoolean(), assignment).build()));
}
private void addTask(PersistentTasksInProgress.Builder tasks, String action, String param, String node, boolean stopped) {
private void addTask(PersistentTasks.Builder tasks, String action, String param, String node, boolean stopped) {
tasks.addTask(action, new TestRequest(param), stopped, randomBoolean(), new Assignment(node, "explanation: " + action));
}

View File

@ -21,9 +21,9 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractDiffableSerializationTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Builder;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasks.Builder;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.Status;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestPersistentAction;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest;
@ -37,12 +37,12 @@ import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_GATEWAY;
import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_SNAPSHOT;
import static org.elasticsearch.xpack.persistent.TransportPersistentAction.NO_NODE_FOUND;
public class PersistentTasksInProgressTests extends AbstractDiffableSerializationTestCase<Custom> {
public class PersistentTasksTests extends AbstractDiffableSerializationTestCase<Custom> {
@Override
protected PersistentTasksInProgress createTestInstance() {
protected PersistentTasks createTestInstance() {
int numberOfTasks = randomInt(10);
PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder();
PersistentTasks.Builder tasks = PersistentTasks.builder();
for (int i = 0; i < numberOfTasks; i++) {
boolean stopped = randomBoolean();
tasks.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)),
@ -57,14 +57,14 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
@Override
protected Writeable.Reader<Custom> instanceReader() {
return PersistentTasksInProgress::new;
return PersistentTasks::new;
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Arrays.asList(
new Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new),
new Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom),
new Entry(MetaData.Custom.class, PersistentTasks.TYPE, PersistentTasks::new),
new Entry(NamedDiff.class, PersistentTasks.TYPE, PersistentTasks::readDiffFrom),
new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestRequest::new),
new Entry(Task.Status.class, Status.NAME, Status::new)
));
@ -72,7 +72,7 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
@Override
protected Custom makeTestChanges(Custom testInstance) {
PersistentTasksInProgress tasksInProgress = (PersistentTasksInProgress) testInstance;
PersistentTasks tasksInProgress = (PersistentTasks) testInstance;
Builder builder = new Builder();
switch (randomInt(3)) {
case 0:
@ -105,12 +105,12 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
@Override
protected Writeable.Reader<Diff<Custom>> diffReader() {
return PersistentTasksInProgress::readDiffFrom;
return PersistentTasks::readDiffFrom;
}
@Override
protected PersistentTasksInProgress doParseInstance(XContentParser parser) throws IOException {
return PersistentTasksInProgress.fromXContent(parser);
protected PersistentTasks doParseInstance(XContentParser parser) throws IOException {
return PersistentTasks.fromXContent(parser);
}
@Override
@ -142,7 +142,7 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
return builder;
}
private long pickRandomTask(PersistentTasksInProgress testInstance) {
private long pickRandomTask(PersistentTasks testInstance) {
return randomFrom(new ArrayList<>(testInstance.tasks())).getId();
}
@ -157,9 +157,9 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
@SuppressWarnings("unchecked")
public void testSerializationContext() throws Exception {
PersistentTasksInProgress testInstance = createTestInstance();
PersistentTasks testInstance = createTestInstance();
for (int i = 0; i < randomInt(10); i++) {
testInstance = (PersistentTasksInProgress) makeTestChanges(testInstance);
testInstance = (PersistentTasks) makeTestChanges(testInstance);
}
ToXContent.MapParams params = new ToXContent.MapParams(
@ -170,12 +170,12 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
XContentBuilder shuffled = shuffleXContent(builder);
XContentParser parser = createParser(XContentFactory.xContent(xContentType), shuffled.bytes());
PersistentTasksInProgress newInstance = doParseInstance(parser);
PersistentTasks newInstance = doParseInstance(parser);
assertNotSame(newInstance, testInstance);
assertEquals(testInstance.tasks().size(), newInstance.tasks().size());
for (PersistentTaskInProgress<?> testTask : testInstance.tasks()) {
PersistentTaskInProgress<TestRequest> newTask = (PersistentTaskInProgress<TestRequest>) newInstance.getTask(testTask.getId());
for (PersistentTask<?> testTask : testInstance.tasks()) {
PersistentTask<TestRequest> newTask = (PersistentTask<TestRequest>) newInstance.getTask(testTask.getId());
assertNotNull(newTask);
// Things that should be serialized
@ -192,14 +192,14 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
}
public void testBuilder() {
PersistentTasksInProgress persistentTasksInProgress = null;
PersistentTasks persistentTasks = null;
long lastKnownTask = -1;
for (int i = 0; i < randomIntBetween(10, 100); i++) {
final Builder builder;
if (randomBoolean()) {
builder = new Builder();
} else {
builder = new Builder(persistentTasksInProgress);
builder = new Builder(persistentTasks);
}
boolean changed = false;
for (int j = 0; j < randomIntBetween(1, 10); j++) {
@ -220,7 +220,7 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
break;
case 2:
if (builder.hasTask(lastKnownTask)) {
PersistentTaskInProgress<?> task = builder.build().getTask(lastKnownTask);
PersistentTask<?> task = builder.build().getTask(lastKnownTask);
if (randomBoolean()) {
// Trying to reassign to the same node
builder.assignTask(lastKnownTask, (s, request) -> task.getAssignment());
@ -263,7 +263,7 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
}
}
assertEquals(changed, builder.isChanged());
persistentTasksInProgress = builder.build();
persistentTasks = builder.build();
}
}

View File

@ -49,7 +49,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
import java.io.IOException;
import java.util.ArrayList;
@ -107,8 +107,8 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
new NamedWriteableRegistry.Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestRequest::new),
new NamedWriteableRegistry.Entry(Task.Status.class,
PersistentActionCoordinator.Status.NAME, PersistentActionCoordinator.Status::new),
new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom),
new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasks.TYPE, PersistentTasks::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasks.TYPE, PersistentTasks::readDiffFrom),
new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new)
);
}
@ -116,8 +116,8 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return Arrays.asList(
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasksInProgress.TYPE),
PersistentTasksInProgress::fromXContent),
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasks.TYPE),
PersistentTasks::fromXContent),
new NamedXContentRegistry.Entry(PersistentActionRequest.class, new ParseField(TestPersistentAction.NAME),
TestRequest::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent)
@ -368,7 +368,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
}
@Override
protected void nodeOperation(PersistentTask task, TestRequest request, ActionListener<Empty> listener) {
protected void nodeOperation(NodePersistentTask task, TestRequest request, ActionListener<Empty> listener) {
logger.info("started node operation for the task {}", task);
try {
TestTask testTask = (TestTask) task;
@ -451,7 +451,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
}
public static class TestTask extends PersistentTask {
public static class TestTask extends NodePersistentTask {
private volatile String operation;
public TestTask(long id, String type, String action, String description, TaskId parentTask) {