diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 76de682e99d..efd78cfaf75 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.core.ml; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.ClusterState; @@ -22,19 +21,12 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.groups.GroupOrJobLookup; -import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.NameResolver; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; @@ -49,7 +41,6 @@ import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import java.util.function.Supplier; import java.util.stream.Collectors; public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { @@ -82,19 +73,10 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { return jobs; } - public boolean isGroupOrJob(String id) { - return groupOrJobLookup.isGroupOrJob(id); - } - public Set expandJobIds(String expression, boolean allowNoJobs) { return groupOrJobLookup.expandJobIds(expression, allowNoJobs); } - public boolean isJobDeleting(String jobId) { - Job job = jobs.get(jobId); - return job == null || job.isDeleting(); - } - public SortedMap getDatafeeds() { return datafeeds; } @@ -278,20 +260,9 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { return this; } - public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) { - checkJobHasNoDatafeed(jobId); - - JobState jobState = MlTasks.getJobState(jobId, tasks); - if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { - throw ExceptionsHelper.conflictStatusException("Unexpected job state [" + jobState + "], expected [" + - JobState.CLOSED + " or " + JobState.FAILED + "]"); - } - Job job = jobs.remove(jobId); - if (job == null) { - throw new ResourceNotFoundException("job [" + jobId + "] does not exist"); - } - if (job.isDeleting() == false) { - throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because it hasn't marked as deleted"); + public Builder putJobs(Collection jobs) { + for (Job job : jobs) { + putJob(job, true); } return this; } @@ -300,6 +271,7 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { if (datafeeds.containsKey(datafeedConfig.getId())) { throw ExceptionsHelper.datafeedAlreadyExists(datafeedConfig.getId()); } + String jobId = datafeedConfig.getJobId(); checkJobIsAvailableForDatafeed(jobId); Job job = jobs.get(jobId); @@ -331,54 +303,10 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { } } - public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksCustomMetaData persistentTasks, Map headers) { - String datafeedId = update.getId(); - DatafeedConfig oldDatafeedConfig = datafeeds.get(datafeedId); - if (oldDatafeedConfig == null) { - throw ExceptionsHelper.missingDatafeedException(datafeedId); - } - checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE, datafeedId, - DatafeedState.STARTED), datafeedId, persistentTasks); - DatafeedConfig newDatafeedConfig = update.apply(oldDatafeedConfig, headers); - if (newDatafeedConfig.getJobId().equals(oldDatafeedConfig.getJobId()) == false) { - checkJobIsAvailableForDatafeed(newDatafeedConfig.getJobId()); - } - Job job = jobs.get(newDatafeedConfig.getJobId()); - DatafeedJobValidator.validate(newDatafeedConfig, job); - datafeeds.put(datafeedId, newDatafeedConfig); - return this; - } - - public Builder removeDatafeed(String datafeedId, PersistentTasksCustomMetaData persistentTasks) { - DatafeedConfig datafeed = datafeeds.get(datafeedId); - if (datafeed == null) { - throw ExceptionsHelper.missingDatafeedException(datafeedId); - } - checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, datafeedId, - DatafeedState.STARTED), datafeedId, persistentTasks); - datafeeds.remove(datafeedId); - return this; - } - private Optional getDatafeedByJobId(String jobId) { return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst(); } - private void checkDatafeedIsStopped(Supplier msg, String datafeedId, PersistentTasksCustomMetaData persistentTasks) { - if (persistentTasks != null) { - if (persistentTasks.getTask(MlTasks.datafeedTaskId(datafeedId)) != null) { - throw ExceptionsHelper.conflictStatusException(msg.get()); - } - } - } - - public Builder putJobs(Collection jobs) { - for (Job job : jobs) { - putJob(job, true); - } - return this; - } - public Builder putDatafeeds(Collection datafeeds) { for (DatafeedConfig datafeed : datafeeds) { this.datafeeds.put(datafeed.getId(), datafeed); @@ -389,39 +317,6 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { public MlMetadata build() { return new MlMetadata(jobs, datafeeds); } - - public void markJobAsDeleting(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) { - Job job = jobs.get(jobId); - if (job == null) { - throw ExceptionsHelper.missingJobException(jobId); - } - if (job.isDeleting()) { - // Job still exists but is already being deleted - return; - } - - checkJobHasNoDatafeed(jobId); - - if (allowDeleteOpenJob == false) { - PersistentTask jobTask = MlTasks.getJobTask(jobId, tasks); - if (jobTask != null) { - JobTaskState jobTaskState = (JobTaskState) jobTask.getState(); - throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is " - + ((jobTaskState == null) ? JobState.OPENING : jobTaskState.getState())); - } - } - Job.Builder jobBuilder = new Job.Builder(job); - jobBuilder.setDeleting(true); - putJob(jobBuilder.build(), true); - } - - void checkJobHasNoDatafeed(String jobId) { - Optional datafeed = getDatafeedByJobId(jobId); - if (datafeed.isPresent()) { - throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed [" - + datafeed.get().getId() + "] refers to it"); - } - } } public static MlMetadata getMlMetadata(ClusterState state) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 73b1fe155fc..a21e6bc8c13 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -19,9 +19,6 @@ import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.get.MultiGetItemResponse; -import org.elasticsearch.action.get.MultiGetRequest; -import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; @@ -184,51 +181,6 @@ public class JobConfigProvider { }, client::get); } - /** - * Get the list anomaly detector jobs specified by {@code jobIds}. - * - * WARNING: errors are silently ignored, if a job is not found a - * {@code ResourceNotFoundException} is not thrown. Only found - * jobs are returned, this size of the returned jobs list could - * be different to the size of the requested ids list. - * - * @param jobIds The jobs to get - * @param listener Jobs listener - */ - public void getJobs(List jobIds, ActionListener> listener) { - MultiGetRequest multiGetRequest = new MultiGetRequest(); - jobIds.forEach(jobId -> multiGetRequest.add(AnomalyDetectorsIndex.configIndexName(), - ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId))); - - List jobs = new ArrayList<>(); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, multiGetRequest, new ActionListener() { - @Override - public void onResponse(MultiGetResponse multiGetResponse) { - - MultiGetItemResponse[] responses = multiGetResponse.getResponses(); - for (MultiGetItemResponse response : responses) { - GetResponse getResponse = response.getResponse(); - if (getResponse.isExists()) { - BytesReference source = getResponse.getSourceAsBytesRef(); - try { - Job.Builder job = parseJobLenientlyFromSource(source); - jobs.add(job); - } catch (IOException e) { - logger.error("Error parsing job configuration [" + response.getId() + "]"); - } - } - } - - listener.onResponse(jobs); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, client::multiGet); - } - /** * Delete the anomaly detector job config document. * {@code errorIfMissing} controls whether or not an error is returned diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 2377bc5921e..637b1089d9e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -5,44 +5,30 @@ */ package org.elasticsearch.xpack.ml; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchModule; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xpack.core.ml.MlMetadata; -import org.elasticsearch.xpack.core.ml.MlTasks; -import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTests; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; +import java.util.ArrayList; import java.util.Collections; -import java.util.Date; -import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; -import static org.elasticsearch.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT; -import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig; -import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @@ -122,277 +108,6 @@ public class MlMetadataTests extends AbstractSerializingTestCase { assertThat(result.getJobs().get("2"), sameInstance(job2Attempt2)); } - public void testRemoveJob() { - Job.Builder jobBuilder = buildJobBuilder("1"); - jobBuilder.setDeleting(true); - Job job1 = jobBuilder.build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - builder.putJob(job1, false); - - MlMetadata result = builder.build(); - assertThat(result.getJobs().get("1"), sameInstance(job1)); - assertThat(result.getDatafeeds().get("1"), nullValue()); - - builder = new MlMetadata.Builder(result); - assertThat(result.getJobs().get("1"), sameInstance(job1)); - assertThat(result.getDatafeeds().get("1"), nullValue()); - - builder.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.emptyMap())); - result = builder.build(); - assertThat(result.getJobs().get("1"), nullValue()); - assertThat(result.getDatafeeds().get("1"), nullValue()); - } - - public void testRemoveJob_failBecauseJobIsOpen() { - Job job1 = buildJobBuilder("1").build(); - MlMetadata.Builder builder1 = new MlMetadata.Builder(); - builder1.putJob(job1, false); - - MlMetadata result = builder1.build(); - assertThat(result.getJobs().get("1"), sameInstance(job1)); - assertThat(result.getDatafeeds().get("1"), nullValue()); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask("1", null, JobState.CLOSED, tasksBuilder); - MlMetadata.Builder builder2 = new MlMetadata.Builder(result); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> builder2.deleteJob("1", tasksBuilder.build())); - assertThat(e.status(), equalTo(RestStatus.CONFLICT)); - } - - public void testRemoveJob_failDatafeedRefersToJob() { - Job job1 = createDatafeedJob().build(new Date()); - DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); - - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> builder.deleteJob(job1.getId(), new PersistentTasksCustomMetaData(0L, Collections.emptyMap()))); - assertThat(e.status(), equalTo(RestStatus.CONFLICT)); - String expectedMsg = "Cannot delete job [" + job1.getId() + "] because datafeed [" + datafeedConfig1.getId() + "] refers to it"; - assertThat(e.getMessage(), equalTo(expectedMsg)); - } - - public void testRemoveJob_failBecauseJobDoesNotExist() { - MlMetadata.Builder builder1 = new MlMetadata.Builder(); - expectThrows(ResourceNotFoundException.class, - () -> builder1.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.emptyMap()))); - } - - public void testCrudDatafeed() { - Job job1 = createDatafeedJob().build(new Date()); - DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); - - MlMetadata result = builder.build(); - assertThat(result.getJobs().get("job_id"), sameInstance(job1)); - assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1)); - - builder = new MlMetadata.Builder(result); - builder.removeDatafeed("datafeed1", new PersistentTasksCustomMetaData(0, Collections.emptyMap())); - result = builder.build(); - assertThat(result.getJobs().get("job_id"), sameInstance(job1)); - assertThat(result.getDatafeeds().get("datafeed1"), nullValue()); - } - - public void testPutDatafeed_failBecauseJobDoesNotExist() { - DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", "missing-job").build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - - expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap())); - } - - public void testPutDatafeed_failBecauseJobIsBeingDeleted() { - Job job1 = createDatafeedJob().setDeleting(true).build(new Date()); - DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - builder.putJob(job1, false); - - expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap())); - } - - public void testPutDatafeed_failBecauseDatafeedIdIsAlreadyTaken() { - Job job1 = createDatafeedJob().build(new Date()); - DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); - - expectThrows(ResourceAlreadyExistsException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap())); - } - - public void testPutDatafeed_failBecauseJobAlreadyHasDatafeed() { - Job job1 = createDatafeedJob().build(new Date()); - DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); - DatafeedConfig datafeedConfig2 = createDatafeedConfig("datafeed2", job1.getId()).build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); - - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> builder.putDatafeed(datafeedConfig2, Collections.emptyMap())); - assertThat(e.status(), equalTo(RestStatus.CONFLICT)); - } - - public void testPutDatafeed_failBecauseJobIsNotCompatibleForDatafeed() { - Job.Builder job1 = createDatafeedJob(); - Date now = new Date(); - AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job1.build(now).getAnalysisConfig()); - analysisConfig.setLatency(TimeValue.timeValueHours(1)); - job1.setAnalysisConfig(analysisConfig); - DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - builder.putJob(job1.build(now), false); - - expectThrows(ElasticsearchStatusException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap())); - } - - public void testPutDatafeed_setsSecurityHeaders() { - Job datafeedJob = createDatafeedJob().build(new Date()); - DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", datafeedJob.getId()).build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - builder.putJob(datafeedJob, false); - - Map headers = new HashMap<>(); - headers.put("unrelated_header", "unrelated_header_value"); - headers.put(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user"); - builder.putDatafeed(datafeedConfig, headers); - MlMetadata metadata = builder.build(); - assertThat(metadata.getDatafeed("datafeed1").getHeaders().size(), equalTo(1)); - assertThat(metadata.getDatafeed("datafeed1").getHeaders(), - hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user")); - } - - public void testUpdateDatafeed() { - Job job1 = createDatafeedJob().build(new Date()); - DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); - MlMetadata beforeMetadata = builder.build(); - - DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); - update.setScrollSize(5000); - MlMetadata updatedMetadata = - new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, Collections.emptyMap()).build(); - - DatafeedConfig updatedDatafeed = updatedMetadata.getDatafeed(datafeedConfig1.getId()); - assertThat(updatedDatafeed.getJobId(), equalTo(datafeedConfig1.getJobId())); - assertThat(updatedDatafeed.getIndices(), equalTo(datafeedConfig1.getIndices())); - assertThat(updatedDatafeed.getScrollSize(), equalTo(5000)); - } - - public void testUpdateDatafeed_failBecauseDatafeedDoesNotExist() { - DatafeedUpdate.Builder update = new DatafeedUpdate.Builder("job_id"); - update.setScrollSize(5000); - expectThrows(ResourceNotFoundException.class, - () -> new MlMetadata.Builder().updateDatafeed(update.build(), null, Collections.emptyMap()).build()); - } - - public void testUpdateDatafeed_failBecauseDatafeedIsNotStopped() { - Job job1 = createDatafeedJob().build(new Date()); - DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); - MlMetadata beforeMetadata = builder.build(); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - StartDatafeedAction.DatafeedParams params = new StartDatafeedAction.DatafeedParams(datafeedConfig1.getId(), 0L); - tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed1"), MlTasks.DATAFEED_TASK_NAME, params, INITIAL_ASSIGNMENT); - PersistentTasksCustomMetaData tasksInProgress = tasksBuilder.build(); - - DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); - update.setScrollSize(5000); - - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), tasksInProgress, null)); - assertThat(e.status(), equalTo(RestStatus.CONFLICT)); - } - - public void testUpdateDatafeed_failBecauseNewJobIdDoesNotExist() { - Job job1 = createDatafeedJob().build(new Date()); - DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); - MlMetadata beforeMetadata = builder.build(); - - DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); - update.setJobId(job1.getId() + "_2"); - - expectThrows(ResourceNotFoundException.class, - () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, Collections.emptyMap())); - } - - public void testUpdateDatafeed_failBecauseNewJobHasAnotherDatafeedAttached() { - Job job1 = createDatafeedJob().build(new Date()); - Job.Builder job2 = new Job.Builder(job1); - job2.setId(job1.getId() + "_2"); - DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); - DatafeedConfig datafeedConfig2 = createDatafeedConfig("datafeed2", job2.getId()).build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - builder.putJob(job1, false); - builder.putJob(job2.build(), false); - builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); - builder.putDatafeed(datafeedConfig2, Collections.emptyMap()); - MlMetadata beforeMetadata = builder.build(); - - DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); - update.setJobId(job2.getId()); - - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, Collections.emptyMap())); - assertThat(e.status(), equalTo(RestStatus.CONFLICT)); - assertThat(e.getMessage(), equalTo("A datafeed [datafeed2] already exists for job [job_id_2]")); - } - - public void testUpdateDatafeed_setsSecurityHeaders() { - Job datafeedJob = createDatafeedJob().build(new Date()); - DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", datafeedJob.getId()).build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - builder.putJob(datafeedJob, false); - builder.putDatafeed(datafeedConfig, Collections.emptyMap()); - MlMetadata beforeMetadata = builder.build(); - assertTrue(beforeMetadata.getDatafeed("datafeed1").getHeaders().isEmpty()); - - DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig.getId()); - update.setQueryDelay(TimeValue.timeValueMinutes(5)); - - Map headers = new HashMap<>(); - headers.put("unrelated_header", "unrelated_header_value"); - headers.put(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user"); - MlMetadata afterMetadata = new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, headers).build(); - Map updatedHeaders = afterMetadata.getDatafeed("datafeed1").getHeaders(); - assertThat(updatedHeaders.size(), equalTo(1)); - assertThat(updatedHeaders, hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user")); - } - - public void testRemoveDatafeed_failBecauseDatafeedStarted() { - Job job1 = createDatafeedJob().build(new Date()); - DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); - MlMetadata.Builder builder = new MlMetadata.Builder(); - builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); - - MlMetadata result = builder.build(); - assertThat(result.getJobs().get("job_id"), sameInstance(job1)); - assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1)); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - StartDatafeedAction.DatafeedParams params = new StartDatafeedAction.DatafeedParams("datafeed1", 0L); - tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed1"), MlTasks.DATAFEED_TASK_NAME, params, INITIAL_ASSIGNMENT); - PersistentTasksCustomMetaData tasksInProgress = tasksBuilder.build(); - - MlMetadata.Builder builder2 = new MlMetadata.Builder(result); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> builder2.removeDatafeed("datafeed1", tasksInProgress)); - assertThat(e.status(), equalTo(RestStatus.CONFLICT)); - } - public void testExpandJobIds() { MlMetadata mlMetadata = newMlMetadataWithJobs("bar-1", "foo-1", "foo-2").build(); @@ -404,12 +119,13 @@ public class MlMetadataTests extends AbstractSerializingTestCase { public void testExpandDatafeedIds() { MlMetadata.Builder mlMetadataBuilder = newMlMetadataWithJobs("bar-1", "foo-1", "foo-2"); - mlMetadataBuilder.putDatafeed(createDatafeedConfig("bar-1-feed", "bar-1").build(), Collections.emptyMap()); - mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-1-feed", "foo-1").build(), Collections.emptyMap()); - mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-2-feed", "foo-2").build(), Collections.emptyMap()); + List datafeeds = new ArrayList<>(); + datafeeds.add(createDatafeedConfig("bar-1-feed", "bar-1").build()); + datafeeds.add(createDatafeedConfig("foo-1-feed", "foo-1").build()); + datafeeds.add(createDatafeedConfig("foo-2-feed", "foo-2").build()); + mlMetadataBuilder.putDatafeeds(datafeeds); MlMetadata mlMetadata = mlMetadataBuilder.build(); - assertThat(mlMetadata.expandDatafeedIds("_all", false), contains("bar-1-feed", "foo-1-feed", "foo-2-feed")); assertThat(mlMetadata.expandDatafeedIds("*", false), contains("bar-1-feed", "foo-1-feed", "foo-2-feed")); assertThat(mlMetadata.expandDatafeedIds("foo-*", false), contains("foo-1-feed", "foo-2-feed")); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index c19e8b12295..0266247714d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -186,23 +186,6 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase { assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteJobResponseHolder.get().getResult()); } - public void testGetJobs() throws Exception { - putJob(createJob("nginx", null)); - putJob(createJob("tomcat", null)); - putJob(createJob("mysql", null)); - - List jobsToGet = Arrays.asList("nginx", "tomcat", "unknown-job"); - - AtomicReference> jobsHolder = new AtomicReference<>(); - AtomicReference exceptionHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.getJobs(jobsToGet, actionListener), jobsHolder, exceptionHolder); - assertNull(exceptionHolder.get()); - assertNotNull(jobsHolder.get()); - assertThat(jobsHolder.get(), hasSize(2)); - List foundIds = jobsHolder.get().stream().map(Job.Builder::getId).collect(Collectors.toList()); - assertThat(foundIds, containsInAnyOrder("nginx", "tomcat")); - } - public void testUpdateWithAValidationError() throws Exception { final String jobId = "bad-update-job";