mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
[ML] Remove unused code from the JIndex project (#37477)
This commit is contained in:
parent
44e83f30e2
commit
7c11b05c28
x-pack/plugin
core/src/main/java/org/elasticsearch/xpack/core/ml
ml/src
main/java/org/elasticsearch/xpack/ml/job/persistence
test/java/org/elasticsearch/xpack/ml
@ -5,7 +5,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.core.ml;
|
package org.elasticsearch.xpack.core.ml;
|
||||||
|
|
||||||
import org.elasticsearch.ResourceNotFoundException;
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.AbstractDiffable;
|
import org.elasticsearch.cluster.AbstractDiffable;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
@ -22,19 +21,12 @@ import org.elasticsearch.common.io.stream.Writeable;
|
|||||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||||
import org.elasticsearch.common.xcontent.ToXContent;
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
|
||||||
import org.elasticsearch.xpack.core.ClientHelper;
|
import org.elasticsearch.xpack.core.ClientHelper;
|
||||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
|
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
|
||||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
|
||||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
|
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
|
||||||
import org.elasticsearch.xpack.core.ml.job.groups.GroupOrJobLookup;
|
import org.elasticsearch.xpack.core.ml.job.groups.GroupOrJobLookup;
|
||||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
|
||||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||||
import org.elasticsearch.xpack.core.ml.utils.NameResolver;
|
import org.elasticsearch.xpack.core.ml.utils.NameResolver;
|
||||||
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
|
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
|
||||||
@ -49,7 +41,6 @@ import java.util.Optional;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.SortedMap;
|
import java.util.SortedMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.function.Supplier;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
|
public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
|
||||||
@ -82,19 +73,10 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
|
|||||||
return jobs;
|
return jobs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isGroupOrJob(String id) {
|
|
||||||
return groupOrJobLookup.isGroupOrJob(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Set<String> expandJobIds(String expression, boolean allowNoJobs) {
|
public Set<String> expandJobIds(String expression, boolean allowNoJobs) {
|
||||||
return groupOrJobLookup.expandJobIds(expression, allowNoJobs);
|
return groupOrJobLookup.expandJobIds(expression, allowNoJobs);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isJobDeleting(String jobId) {
|
|
||||||
Job job = jobs.get(jobId);
|
|
||||||
return job == null || job.isDeleting();
|
|
||||||
}
|
|
||||||
|
|
||||||
public SortedMap<String, DatafeedConfig> getDatafeeds() {
|
public SortedMap<String, DatafeedConfig> getDatafeeds() {
|
||||||
return datafeeds;
|
return datafeeds;
|
||||||
}
|
}
|
||||||
@ -278,20 +260,9 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) {
|
public Builder putJobs(Collection<Job> jobs) {
|
||||||
checkJobHasNoDatafeed(jobId);
|
for (Job job : jobs) {
|
||||||
|
putJob(job, true);
|
||||||
JobState jobState = MlTasks.getJobState(jobId, tasks);
|
|
||||||
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
|
|
||||||
throw ExceptionsHelper.conflictStatusException("Unexpected job state [" + jobState + "], expected [" +
|
|
||||||
JobState.CLOSED + " or " + JobState.FAILED + "]");
|
|
||||||
}
|
|
||||||
Job job = jobs.remove(jobId);
|
|
||||||
if (job == null) {
|
|
||||||
throw new ResourceNotFoundException("job [" + jobId + "] does not exist");
|
|
||||||
}
|
|
||||||
if (job.isDeleting() == false) {
|
|
||||||
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because it hasn't marked as deleted");
|
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
@ -300,6 +271,7 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
|
|||||||
if (datafeeds.containsKey(datafeedConfig.getId())) {
|
if (datafeeds.containsKey(datafeedConfig.getId())) {
|
||||||
throw ExceptionsHelper.datafeedAlreadyExists(datafeedConfig.getId());
|
throw ExceptionsHelper.datafeedAlreadyExists(datafeedConfig.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
String jobId = datafeedConfig.getJobId();
|
String jobId = datafeedConfig.getJobId();
|
||||||
checkJobIsAvailableForDatafeed(jobId);
|
checkJobIsAvailableForDatafeed(jobId);
|
||||||
Job job = jobs.get(jobId);
|
Job job = jobs.get(jobId);
|
||||||
@ -331,54 +303,10 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksCustomMetaData persistentTasks, Map<String, String> headers) {
|
|
||||||
String datafeedId = update.getId();
|
|
||||||
DatafeedConfig oldDatafeedConfig = datafeeds.get(datafeedId);
|
|
||||||
if (oldDatafeedConfig == null) {
|
|
||||||
throw ExceptionsHelper.missingDatafeedException(datafeedId);
|
|
||||||
}
|
|
||||||
checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE, datafeedId,
|
|
||||||
DatafeedState.STARTED), datafeedId, persistentTasks);
|
|
||||||
DatafeedConfig newDatafeedConfig = update.apply(oldDatafeedConfig, headers);
|
|
||||||
if (newDatafeedConfig.getJobId().equals(oldDatafeedConfig.getJobId()) == false) {
|
|
||||||
checkJobIsAvailableForDatafeed(newDatafeedConfig.getJobId());
|
|
||||||
}
|
|
||||||
Job job = jobs.get(newDatafeedConfig.getJobId());
|
|
||||||
DatafeedJobValidator.validate(newDatafeedConfig, job);
|
|
||||||
datafeeds.put(datafeedId, newDatafeedConfig);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder removeDatafeed(String datafeedId, PersistentTasksCustomMetaData persistentTasks) {
|
|
||||||
DatafeedConfig datafeed = datafeeds.get(datafeedId);
|
|
||||||
if (datafeed == null) {
|
|
||||||
throw ExceptionsHelper.missingDatafeedException(datafeedId);
|
|
||||||
}
|
|
||||||
checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, datafeedId,
|
|
||||||
DatafeedState.STARTED), datafeedId, persistentTasks);
|
|
||||||
datafeeds.remove(datafeedId);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Optional<DatafeedConfig> getDatafeedByJobId(String jobId) {
|
private Optional<DatafeedConfig> getDatafeedByJobId(String jobId) {
|
||||||
return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst();
|
return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkDatafeedIsStopped(Supplier<String> msg, String datafeedId, PersistentTasksCustomMetaData persistentTasks) {
|
|
||||||
if (persistentTasks != null) {
|
|
||||||
if (persistentTasks.getTask(MlTasks.datafeedTaskId(datafeedId)) != null) {
|
|
||||||
throw ExceptionsHelper.conflictStatusException(msg.get());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder putJobs(Collection<Job> jobs) {
|
|
||||||
for (Job job : jobs) {
|
|
||||||
putJob(job, true);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
|
public Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
|
||||||
for (DatafeedConfig datafeed : datafeeds) {
|
for (DatafeedConfig datafeed : datafeeds) {
|
||||||
this.datafeeds.put(datafeed.getId(), datafeed);
|
this.datafeeds.put(datafeed.getId(), datafeed);
|
||||||
@ -389,39 +317,6 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
|
|||||||
public MlMetadata build() {
|
public MlMetadata build() {
|
||||||
return new MlMetadata(jobs, datafeeds);
|
return new MlMetadata(jobs, datafeeds);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void markJobAsDeleting(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) {
|
|
||||||
Job job = jobs.get(jobId);
|
|
||||||
if (job == null) {
|
|
||||||
throw ExceptionsHelper.missingJobException(jobId);
|
|
||||||
}
|
|
||||||
if (job.isDeleting()) {
|
|
||||||
// Job still exists but is already being deleted
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
checkJobHasNoDatafeed(jobId);
|
|
||||||
|
|
||||||
if (allowDeleteOpenJob == false) {
|
|
||||||
PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
|
|
||||||
if (jobTask != null) {
|
|
||||||
JobTaskState jobTaskState = (JobTaskState) jobTask.getState();
|
|
||||||
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is "
|
|
||||||
+ ((jobTaskState == null) ? JobState.OPENING : jobTaskState.getState()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Job.Builder jobBuilder = new Job.Builder(job);
|
|
||||||
jobBuilder.setDeleting(true);
|
|
||||||
putJob(jobBuilder.build(), true);
|
|
||||||
}
|
|
||||||
|
|
||||||
void checkJobHasNoDatafeed(String jobId) {
|
|
||||||
Optional<DatafeedConfig> datafeed = getDatafeedByJobId(jobId);
|
|
||||||
if (datafeed.isPresent()) {
|
|
||||||
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed ["
|
|
||||||
+ datafeed.get().getId() + "] refers to it");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static MlMetadata getMlMetadata(ClusterState state) {
|
public static MlMetadata getMlMetadata(ClusterState state) {
|
||||||
|
@ -19,9 +19,6 @@ import org.elasticsearch.action.delete.DeleteResponse;
|
|||||||
import org.elasticsearch.action.get.GetAction;
|
import org.elasticsearch.action.get.GetAction;
|
||||||
import org.elasticsearch.action.get.GetRequest;
|
import org.elasticsearch.action.get.GetRequest;
|
||||||
import org.elasticsearch.action.get.GetResponse;
|
import org.elasticsearch.action.get.GetResponse;
|
||||||
import org.elasticsearch.action.get.MultiGetItemResponse;
|
|
||||||
import org.elasticsearch.action.get.MultiGetRequest;
|
|
||||||
import org.elasticsearch.action.get.MultiGetResponse;
|
|
||||||
import org.elasticsearch.action.index.IndexAction;
|
import org.elasticsearch.action.index.IndexAction;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.index.IndexResponse;
|
import org.elasticsearch.action.index.IndexResponse;
|
||||||
@ -184,51 +181,6 @@ public class JobConfigProvider {
|
|||||||
}, client::get);
|
}, client::get);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the list anomaly detector jobs specified by {@code jobIds}.
|
|
||||||
*
|
|
||||||
* WARNING: errors are silently ignored, if a job is not found a
|
|
||||||
* {@code ResourceNotFoundException} is not thrown. Only found
|
|
||||||
* jobs are returned, this size of the returned jobs list could
|
|
||||||
* be different to the size of the requested ids list.
|
|
||||||
*
|
|
||||||
* @param jobIds The jobs to get
|
|
||||||
* @param listener Jobs listener
|
|
||||||
*/
|
|
||||||
public void getJobs(List<String> jobIds, ActionListener<List<Job.Builder>> listener) {
|
|
||||||
MultiGetRequest multiGetRequest = new MultiGetRequest();
|
|
||||||
jobIds.forEach(jobId -> multiGetRequest.add(AnomalyDetectorsIndex.configIndexName(),
|
|
||||||
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)));
|
|
||||||
|
|
||||||
List<Job.Builder> jobs = new ArrayList<>();
|
|
||||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, multiGetRequest, new ActionListener<MultiGetResponse>() {
|
|
||||||
@Override
|
|
||||||
public void onResponse(MultiGetResponse multiGetResponse) {
|
|
||||||
|
|
||||||
MultiGetItemResponse[] responses = multiGetResponse.getResponses();
|
|
||||||
for (MultiGetItemResponse response : responses) {
|
|
||||||
GetResponse getResponse = response.getResponse();
|
|
||||||
if (getResponse.isExists()) {
|
|
||||||
BytesReference source = getResponse.getSourceAsBytesRef();
|
|
||||||
try {
|
|
||||||
Job.Builder job = parseJobLenientlyFromSource(source);
|
|
||||||
jobs.add(job);
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.error("Error parsing job configuration [" + response.getId() + "]");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
listener.onResponse(jobs);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Exception e) {
|
|
||||||
listener.onFailure(e);
|
|
||||||
}
|
|
||||||
}, client::multiGet);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete the anomaly detector job config document.
|
* Delete the anomaly detector job config document.
|
||||||
* {@code errorIfMissing} controls whether or not an error is returned
|
* {@code errorIfMissing} controls whether or not an error is returned
|
||||||
|
@ -5,44 +5,30 @@
|
|||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ml;
|
package org.elasticsearch.xpack.ml;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchStatusException;
|
|
||||||
import org.elasticsearch.ResourceAlreadyExistsException;
|
import org.elasticsearch.ResourceAlreadyExistsException;
|
||||||
import org.elasticsearch.ResourceNotFoundException;
|
|
||||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
|
||||||
import org.elasticsearch.search.SearchModule;
|
import org.elasticsearch.search.SearchModule;
|
||||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||||
import org.elasticsearch.xpack.core.ml.MlTasks;
|
|
||||||
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
|
|
||||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests;
|
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests;
|
||||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
|
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
|
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.JobTests;
|
import org.elasticsearch.xpack.core.ml.job.config.JobTests;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
|
||||||
import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField;
|
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.List;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder;
|
import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder;
|
||||||
import static org.elasticsearch.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT;
|
|
||||||
import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask;
|
|
||||||
import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig;
|
import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig;
|
||||||
import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob;
|
|
||||||
import static org.hamcrest.Matchers.contains;
|
import static org.hamcrest.Matchers.contains;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.hasEntry;
|
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
import static org.hamcrest.Matchers.sameInstance;
|
import static org.hamcrest.Matchers.sameInstance;
|
||||||
|
|
||||||
@ -122,277 +108,6 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
|
|||||||
assertThat(result.getJobs().get("2"), sameInstance(job2Attempt2));
|
assertThat(result.getJobs().get("2"), sameInstance(job2Attempt2));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRemoveJob() {
|
|
||||||
Job.Builder jobBuilder = buildJobBuilder("1");
|
|
||||||
jobBuilder.setDeleting(true);
|
|
||||||
Job job1 = jobBuilder.build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
builder.putJob(job1, false);
|
|
||||||
|
|
||||||
MlMetadata result = builder.build();
|
|
||||||
assertThat(result.getJobs().get("1"), sameInstance(job1));
|
|
||||||
assertThat(result.getDatafeeds().get("1"), nullValue());
|
|
||||||
|
|
||||||
builder = new MlMetadata.Builder(result);
|
|
||||||
assertThat(result.getJobs().get("1"), sameInstance(job1));
|
|
||||||
assertThat(result.getDatafeeds().get("1"), nullValue());
|
|
||||||
|
|
||||||
builder.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.emptyMap()));
|
|
||||||
result = builder.build();
|
|
||||||
assertThat(result.getJobs().get("1"), nullValue());
|
|
||||||
assertThat(result.getDatafeeds().get("1"), nullValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testRemoveJob_failBecauseJobIsOpen() {
|
|
||||||
Job job1 = buildJobBuilder("1").build();
|
|
||||||
MlMetadata.Builder builder1 = new MlMetadata.Builder();
|
|
||||||
builder1.putJob(job1, false);
|
|
||||||
|
|
||||||
MlMetadata result = builder1.build();
|
|
||||||
assertThat(result.getJobs().get("1"), sameInstance(job1));
|
|
||||||
assertThat(result.getDatafeeds().get("1"), nullValue());
|
|
||||||
|
|
||||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
|
||||||
addJobTask("1", null, JobState.CLOSED, tasksBuilder);
|
|
||||||
MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
|
|
||||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
|
||||||
() -> builder2.deleteJob("1", tasksBuilder.build()));
|
|
||||||
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testRemoveJob_failDatafeedRefersToJob() {
|
|
||||||
Job job1 = createDatafeedJob().build(new Date());
|
|
||||||
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
builder.putJob(job1, false);
|
|
||||||
builder.putDatafeed(datafeedConfig1, Collections.emptyMap());
|
|
||||||
|
|
||||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
|
||||||
() -> builder.deleteJob(job1.getId(), new PersistentTasksCustomMetaData(0L, Collections.emptyMap())));
|
|
||||||
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
|
|
||||||
String expectedMsg = "Cannot delete job [" + job1.getId() + "] because datafeed [" + datafeedConfig1.getId() + "] refers to it";
|
|
||||||
assertThat(e.getMessage(), equalTo(expectedMsg));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testRemoveJob_failBecauseJobDoesNotExist() {
|
|
||||||
MlMetadata.Builder builder1 = new MlMetadata.Builder();
|
|
||||||
expectThrows(ResourceNotFoundException.class,
|
|
||||||
() -> builder1.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.emptyMap())));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testCrudDatafeed() {
|
|
||||||
Job job1 = createDatafeedJob().build(new Date());
|
|
||||||
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
builder.putJob(job1, false);
|
|
||||||
builder.putDatafeed(datafeedConfig1, Collections.emptyMap());
|
|
||||||
|
|
||||||
MlMetadata result = builder.build();
|
|
||||||
assertThat(result.getJobs().get("job_id"), sameInstance(job1));
|
|
||||||
assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1));
|
|
||||||
|
|
||||||
builder = new MlMetadata.Builder(result);
|
|
||||||
builder.removeDatafeed("datafeed1", new PersistentTasksCustomMetaData(0, Collections.emptyMap()));
|
|
||||||
result = builder.build();
|
|
||||||
assertThat(result.getJobs().get("job_id"), sameInstance(job1));
|
|
||||||
assertThat(result.getDatafeeds().get("datafeed1"), nullValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testPutDatafeed_failBecauseJobDoesNotExist() {
|
|
||||||
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", "missing-job").build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
|
|
||||||
expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap()));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testPutDatafeed_failBecauseJobIsBeingDeleted() {
|
|
||||||
Job job1 = createDatafeedJob().setDeleting(true).build(new Date());
|
|
||||||
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
builder.putJob(job1, false);
|
|
||||||
|
|
||||||
expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap()));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testPutDatafeed_failBecauseDatafeedIdIsAlreadyTaken() {
|
|
||||||
Job job1 = createDatafeedJob().build(new Date());
|
|
||||||
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
builder.putJob(job1, false);
|
|
||||||
builder.putDatafeed(datafeedConfig1, Collections.emptyMap());
|
|
||||||
|
|
||||||
expectThrows(ResourceAlreadyExistsException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap()));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testPutDatafeed_failBecauseJobAlreadyHasDatafeed() {
|
|
||||||
Job job1 = createDatafeedJob().build(new Date());
|
|
||||||
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
|
|
||||||
DatafeedConfig datafeedConfig2 = createDatafeedConfig("datafeed2", job1.getId()).build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
builder.putJob(job1, false);
|
|
||||||
builder.putDatafeed(datafeedConfig1, Collections.emptyMap());
|
|
||||||
|
|
||||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
|
||||||
() -> builder.putDatafeed(datafeedConfig2, Collections.emptyMap()));
|
|
||||||
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testPutDatafeed_failBecauseJobIsNotCompatibleForDatafeed() {
|
|
||||||
Job.Builder job1 = createDatafeedJob();
|
|
||||||
Date now = new Date();
|
|
||||||
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job1.build(now).getAnalysisConfig());
|
|
||||||
analysisConfig.setLatency(TimeValue.timeValueHours(1));
|
|
||||||
job1.setAnalysisConfig(analysisConfig);
|
|
||||||
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
builder.putJob(job1.build(now), false);
|
|
||||||
|
|
||||||
expectThrows(ElasticsearchStatusException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap()));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testPutDatafeed_setsSecurityHeaders() {
|
|
||||||
Job datafeedJob = createDatafeedJob().build(new Date());
|
|
||||||
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", datafeedJob.getId()).build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
builder.putJob(datafeedJob, false);
|
|
||||||
|
|
||||||
Map<String, String> headers = new HashMap<>();
|
|
||||||
headers.put("unrelated_header", "unrelated_header_value");
|
|
||||||
headers.put(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user");
|
|
||||||
builder.putDatafeed(datafeedConfig, headers);
|
|
||||||
MlMetadata metadata = builder.build();
|
|
||||||
assertThat(metadata.getDatafeed("datafeed1").getHeaders().size(), equalTo(1));
|
|
||||||
assertThat(metadata.getDatafeed("datafeed1").getHeaders(),
|
|
||||||
hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testUpdateDatafeed() {
|
|
||||||
Job job1 = createDatafeedJob().build(new Date());
|
|
||||||
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
builder.putJob(job1, false);
|
|
||||||
builder.putDatafeed(datafeedConfig1, Collections.emptyMap());
|
|
||||||
MlMetadata beforeMetadata = builder.build();
|
|
||||||
|
|
||||||
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId());
|
|
||||||
update.setScrollSize(5000);
|
|
||||||
MlMetadata updatedMetadata =
|
|
||||||
new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, Collections.emptyMap()).build();
|
|
||||||
|
|
||||||
DatafeedConfig updatedDatafeed = updatedMetadata.getDatafeed(datafeedConfig1.getId());
|
|
||||||
assertThat(updatedDatafeed.getJobId(), equalTo(datafeedConfig1.getJobId()));
|
|
||||||
assertThat(updatedDatafeed.getIndices(), equalTo(datafeedConfig1.getIndices()));
|
|
||||||
assertThat(updatedDatafeed.getScrollSize(), equalTo(5000));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testUpdateDatafeed_failBecauseDatafeedDoesNotExist() {
|
|
||||||
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder("job_id");
|
|
||||||
update.setScrollSize(5000);
|
|
||||||
expectThrows(ResourceNotFoundException.class,
|
|
||||||
() -> new MlMetadata.Builder().updateDatafeed(update.build(), null, Collections.emptyMap()).build());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testUpdateDatafeed_failBecauseDatafeedIsNotStopped() {
|
|
||||||
Job job1 = createDatafeedJob().build(new Date());
|
|
||||||
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
builder.putJob(job1, false);
|
|
||||||
builder.putDatafeed(datafeedConfig1, Collections.emptyMap());
|
|
||||||
MlMetadata beforeMetadata = builder.build();
|
|
||||||
|
|
||||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
|
||||||
StartDatafeedAction.DatafeedParams params = new StartDatafeedAction.DatafeedParams(datafeedConfig1.getId(), 0L);
|
|
||||||
tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed1"), MlTasks.DATAFEED_TASK_NAME, params, INITIAL_ASSIGNMENT);
|
|
||||||
PersistentTasksCustomMetaData tasksInProgress = tasksBuilder.build();
|
|
||||||
|
|
||||||
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId());
|
|
||||||
update.setScrollSize(5000);
|
|
||||||
|
|
||||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
|
||||||
() -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), tasksInProgress, null));
|
|
||||||
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testUpdateDatafeed_failBecauseNewJobIdDoesNotExist() {
|
|
||||||
Job job1 = createDatafeedJob().build(new Date());
|
|
||||||
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
builder.putJob(job1, false);
|
|
||||||
builder.putDatafeed(datafeedConfig1, Collections.emptyMap());
|
|
||||||
MlMetadata beforeMetadata = builder.build();
|
|
||||||
|
|
||||||
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId());
|
|
||||||
update.setJobId(job1.getId() + "_2");
|
|
||||||
|
|
||||||
expectThrows(ResourceNotFoundException.class,
|
|
||||||
() -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, Collections.emptyMap()));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testUpdateDatafeed_failBecauseNewJobHasAnotherDatafeedAttached() {
|
|
||||||
Job job1 = createDatafeedJob().build(new Date());
|
|
||||||
Job.Builder job2 = new Job.Builder(job1);
|
|
||||||
job2.setId(job1.getId() + "_2");
|
|
||||||
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
|
|
||||||
DatafeedConfig datafeedConfig2 = createDatafeedConfig("datafeed2", job2.getId()).build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
builder.putJob(job1, false);
|
|
||||||
builder.putJob(job2.build(), false);
|
|
||||||
builder.putDatafeed(datafeedConfig1, Collections.emptyMap());
|
|
||||||
builder.putDatafeed(datafeedConfig2, Collections.emptyMap());
|
|
||||||
MlMetadata beforeMetadata = builder.build();
|
|
||||||
|
|
||||||
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId());
|
|
||||||
update.setJobId(job2.getId());
|
|
||||||
|
|
||||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
|
||||||
() -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, Collections.emptyMap()));
|
|
||||||
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
|
|
||||||
assertThat(e.getMessage(), equalTo("A datafeed [datafeed2] already exists for job [job_id_2]"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testUpdateDatafeed_setsSecurityHeaders() {
|
|
||||||
Job datafeedJob = createDatafeedJob().build(new Date());
|
|
||||||
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", datafeedJob.getId()).build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
builder.putJob(datafeedJob, false);
|
|
||||||
builder.putDatafeed(datafeedConfig, Collections.emptyMap());
|
|
||||||
MlMetadata beforeMetadata = builder.build();
|
|
||||||
assertTrue(beforeMetadata.getDatafeed("datafeed1").getHeaders().isEmpty());
|
|
||||||
|
|
||||||
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig.getId());
|
|
||||||
update.setQueryDelay(TimeValue.timeValueMinutes(5));
|
|
||||||
|
|
||||||
Map<String, String> headers = new HashMap<>();
|
|
||||||
headers.put("unrelated_header", "unrelated_header_value");
|
|
||||||
headers.put(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user");
|
|
||||||
MlMetadata afterMetadata = new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, headers).build();
|
|
||||||
Map<String, String> updatedHeaders = afterMetadata.getDatafeed("datafeed1").getHeaders();
|
|
||||||
assertThat(updatedHeaders.size(), equalTo(1));
|
|
||||||
assertThat(updatedHeaders, hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testRemoveDatafeed_failBecauseDatafeedStarted() {
|
|
||||||
Job job1 = createDatafeedJob().build(new Date());
|
|
||||||
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
|
|
||||||
MlMetadata.Builder builder = new MlMetadata.Builder();
|
|
||||||
builder.putJob(job1, false);
|
|
||||||
builder.putDatafeed(datafeedConfig1, Collections.emptyMap());
|
|
||||||
|
|
||||||
MlMetadata result = builder.build();
|
|
||||||
assertThat(result.getJobs().get("job_id"), sameInstance(job1));
|
|
||||||
assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1));
|
|
||||||
|
|
||||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
|
||||||
StartDatafeedAction.DatafeedParams params = new StartDatafeedAction.DatafeedParams("datafeed1", 0L);
|
|
||||||
tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed1"), MlTasks.DATAFEED_TASK_NAME, params, INITIAL_ASSIGNMENT);
|
|
||||||
PersistentTasksCustomMetaData tasksInProgress = tasksBuilder.build();
|
|
||||||
|
|
||||||
MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
|
|
||||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
|
||||||
() -> builder2.removeDatafeed("datafeed1", tasksInProgress));
|
|
||||||
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testExpandJobIds() {
|
public void testExpandJobIds() {
|
||||||
MlMetadata mlMetadata = newMlMetadataWithJobs("bar-1", "foo-1", "foo-2").build();
|
MlMetadata mlMetadata = newMlMetadataWithJobs("bar-1", "foo-1", "foo-2").build();
|
||||||
|
|
||||||
@ -404,12 +119,13 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
|
|||||||
|
|
||||||
public void testExpandDatafeedIds() {
|
public void testExpandDatafeedIds() {
|
||||||
MlMetadata.Builder mlMetadataBuilder = newMlMetadataWithJobs("bar-1", "foo-1", "foo-2");
|
MlMetadata.Builder mlMetadataBuilder = newMlMetadataWithJobs("bar-1", "foo-1", "foo-2");
|
||||||
mlMetadataBuilder.putDatafeed(createDatafeedConfig("bar-1-feed", "bar-1").build(), Collections.emptyMap());
|
List<DatafeedConfig> datafeeds = new ArrayList<>();
|
||||||
mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-1-feed", "foo-1").build(), Collections.emptyMap());
|
datafeeds.add(createDatafeedConfig("bar-1-feed", "bar-1").build());
|
||||||
mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-2-feed", "foo-2").build(), Collections.emptyMap());
|
datafeeds.add(createDatafeedConfig("foo-1-feed", "foo-1").build());
|
||||||
|
datafeeds.add(createDatafeedConfig("foo-2-feed", "foo-2").build());
|
||||||
|
mlMetadataBuilder.putDatafeeds(datafeeds);
|
||||||
MlMetadata mlMetadata = mlMetadataBuilder.build();
|
MlMetadata mlMetadata = mlMetadataBuilder.build();
|
||||||
|
|
||||||
|
|
||||||
assertThat(mlMetadata.expandDatafeedIds("_all", false), contains("bar-1-feed", "foo-1-feed", "foo-2-feed"));
|
assertThat(mlMetadata.expandDatafeedIds("_all", false), contains("bar-1-feed", "foo-1-feed", "foo-2-feed"));
|
||||||
assertThat(mlMetadata.expandDatafeedIds("*", false), contains("bar-1-feed", "foo-1-feed", "foo-2-feed"));
|
assertThat(mlMetadata.expandDatafeedIds("*", false), contains("bar-1-feed", "foo-1-feed", "foo-2-feed"));
|
||||||
assertThat(mlMetadata.expandDatafeedIds("foo-*", false), contains("foo-1-feed", "foo-2-feed"));
|
assertThat(mlMetadata.expandDatafeedIds("foo-*", false), contains("foo-1-feed", "foo-2-feed"));
|
||||||
|
@ -186,23 +186,6 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
|
|||||||
assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteJobResponseHolder.get().getResult());
|
assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteJobResponseHolder.get().getResult());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testGetJobs() throws Exception {
|
|
||||||
putJob(createJob("nginx", null));
|
|
||||||
putJob(createJob("tomcat", null));
|
|
||||||
putJob(createJob("mysql", null));
|
|
||||||
|
|
||||||
List<String> jobsToGet = Arrays.asList("nginx", "tomcat", "unknown-job");
|
|
||||||
|
|
||||||
AtomicReference<List<Job.Builder>> jobsHolder = new AtomicReference<>();
|
|
||||||
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
|
|
||||||
blockingCall(actionListener -> jobConfigProvider.getJobs(jobsToGet, actionListener), jobsHolder, exceptionHolder);
|
|
||||||
assertNull(exceptionHolder.get());
|
|
||||||
assertNotNull(jobsHolder.get());
|
|
||||||
assertThat(jobsHolder.get(), hasSize(2));
|
|
||||||
List<String> foundIds = jobsHolder.get().stream().map(Job.Builder::getId).collect(Collectors.toList());
|
|
||||||
assertThat(foundIds, containsInAnyOrder("nginx", "tomcat"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testUpdateWithAValidationError() throws Exception {
|
public void testUpdateWithAValidationError() throws Exception {
|
||||||
final String jobId = "bad-update-job";
|
final String jobId = "bad-update-job";
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user