[ML] Check calendar exists before removing job (elastic/x-pack-elasticsearch#3661)
Also, removes check for whether a job-to-remove exists and replaces it with a check of whether a job-to-remove is already present in the calendar. This allows to remove a job that may no longer exists and it improves feedback for the case that an existing job is removed from a calendar that doesn't contain it. relates elastic/x-pack-elasticsearch#3620 Original commit: elastic/x-pack-elasticsearch@3ea39be1b6
This commit is contained in:
parent
129f021843
commit
cc432aee34
|
@ -67,7 +67,9 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||||
import org.elasticsearch.search.sort.FieldSortBuilder;
|
import org.elasticsearch.search.sort.FieldSortBuilder;
|
||||||
import org.elasticsearch.search.sort.SortBuilders;
|
import org.elasticsearch.search.sort.SortBuilders;
|
||||||
import org.elasticsearch.search.sort.SortOrder;
|
import org.elasticsearch.search.sort.SortOrder;
|
||||||
|
import org.elasticsearch.xpack.ml.MLMetadataField;
|
||||||
import org.elasticsearch.xpack.ml.MlMetaIndex;
|
import org.elasticsearch.xpack.ml.MlMetaIndex;
|
||||||
|
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||||
import org.elasticsearch.xpack.ml.action.GetBucketsAction;
|
import org.elasticsearch.xpack.ml.action.GetBucketsAction;
|
||||||
import org.elasticsearch.xpack.ml.action.GetCategoriesAction;
|
import org.elasticsearch.xpack.ml.action.GetCategoriesAction;
|
||||||
import org.elasticsearch.xpack.ml.action.GetInfluencersAction;
|
import org.elasticsearch.xpack.ml.action.GetInfluencersAction;
|
||||||
|
@ -1081,12 +1083,29 @@ public class JobProvider {
|
||||||
result -> handler.accept(result.result), errorHandler, () -> null);
|
result -> handler.accept(result.result), errorHandler, () -> null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateCalendar(String calendarId, Set<String> jobIdsToAdd, Set<String> jobIdsToRemove,
|
public void updateCalendar(String calendarId, Set<String> jobIdsToAdd, Set<String> jobIdsToRemove, ClusterState clusterState,
|
||||||
Consumer<Calendar> handler, Consumer<Exception> errorHandler) {
|
Consumer<Calendar> handler, Consumer<Exception> errorHandler) {
|
||||||
|
|
||||||
ActionListener<Calendar> getCalendarListener = ActionListener.wrap(
|
ActionListener<Calendar> getCalendarListener = ActionListener.wrap(
|
||||||
calendar -> {
|
calendar -> {
|
||||||
Set<String> currentJobs = new HashSet<>(calendar.getJobIds());
|
Set<String> currentJobs = new HashSet<>(calendar.getJobIds());
|
||||||
|
|
||||||
|
for (String jobToAdd : jobIdsToAdd) {
|
||||||
|
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
|
||||||
|
if (mlMetadata.isGroupOrJob(jobToAdd) == false) {
|
||||||
|
errorHandler.accept(ExceptionsHelper.missingJobException(jobToAdd));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (String jobToRemove : jobIdsToRemove) {
|
||||||
|
if (currentJobs.contains(jobToRemove) == false) {
|
||||||
|
errorHandler.accept(ExceptionsHelper.badRequestException("Cannot remove [" + jobToRemove
|
||||||
|
+ "] as it is not present in calendar [" + calendarId + "]"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
currentJobs.addAll(jobIdsToAdd);
|
currentJobs.addAll(jobIdsToAdd);
|
||||||
currentJobs.removeAll(jobIdsToRemove);
|
currentJobs.removeAll(jobIdsToRemove);
|
||||||
Calendar updatedCalendar = new Calendar(calendar.getId(), new ArrayList<>(currentJobs), calendar.getDescription());
|
Calendar updatedCalendar = new Calendar(calendar.getId(), new ArrayList<>(currentJobs), calendar.getDescription());
|
||||||
|
|
|
@ -8,18 +8,14 @@ package org.elasticsearch.xpack.ml.action;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.HandledTransportAction;
|
import org.elasticsearch.action.support.HandledTransportAction;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.xpack.ml.MLMetadataField;
|
|
||||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
|
||||||
import org.elasticsearch.xpack.ml.job.JobManager;
|
import org.elasticsearch.xpack.ml.job.JobManager;
|
||||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
|
||||||
|
|
||||||
public class TransportUpdateCalendarJobAction extends HandledTransportAction<UpdateCalendarJobAction.Request, PutCalendarAction.Response> {
|
public class TransportUpdateCalendarJobAction extends HandledTransportAction<UpdateCalendarJobAction.Request, PutCalendarAction.Response> {
|
||||||
|
|
||||||
|
@ -41,23 +37,7 @@ public class TransportUpdateCalendarJobAction extends HandledTransportAction<Upd
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doExecute(UpdateCalendarJobAction.Request request, ActionListener<PutCalendarAction.Response> listener) {
|
protected void doExecute(UpdateCalendarJobAction.Request request, ActionListener<PutCalendarAction.Response> listener) {
|
||||||
ClusterState state = clusterService.state();
|
jobProvider.updateCalendar(request.getCalendarId(), request.getJobIdsToAdd(), request.getJobIdsToRemove(), clusterService.state(),
|
||||||
MlMetadata mlMetadata = state.getMetaData().custom(MLMetadataField.TYPE);
|
|
||||||
for (String jobToAdd: request.getJobIdsToAdd()) {
|
|
||||||
if (mlMetadata.isGroupOrJob(jobToAdd) == false) {
|
|
||||||
listener.onFailure(ExceptionsHelper.missingJobException(jobToAdd));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (String jobToRemove: request.getJobIdsToRemove()) {
|
|
||||||
if (mlMetadata.isGroupOrJob(jobToRemove) == false) {
|
|
||||||
listener.onFailure(ExceptionsHelper.missingJobException(jobToRemove));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
jobProvider.updateCalendar(request.getCalendarId(), request.getJobIdsToAdd(), request.getJobIdsToRemove(),
|
|
||||||
c -> {
|
c -> {
|
||||||
jobManager.updateProcessOnCalendarChanged(c.getJobIds());
|
jobManager.updateProcessOnCalendarChanged(c.getJobIds());
|
||||||
listener.onResponse(new PutCalendarAction.Response(c));
|
listener.onResponse(new PutCalendarAction.Response(c));
|
||||||
|
|
|
@ -10,7 +10,9 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.support.WriteRequest;
|
import org.elasticsearch.action.support.WriteRequest;
|
||||||
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -20,10 +22,12 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.xpack.XPackSettings;
|
import org.elasticsearch.xpack.XPackSettings;
|
||||||
import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;
|
|
||||||
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
|
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
|
||||||
|
import org.elasticsearch.xpack.ml.MLMetadataField;
|
||||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||||
import org.elasticsearch.xpack.ml.MlMetaIndex;
|
import org.elasticsearch.xpack.ml.MlMetaIndex;
|
||||||
|
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||||
|
import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;
|
||||||
import org.elasticsearch.xpack.ml.action.PutJobAction;
|
import org.elasticsearch.xpack.ml.action.PutJobAction;
|
||||||
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
||||||
import org.elasticsearch.xpack.ml.calendars.Calendar;
|
import org.elasticsearch.xpack.ml.calendars.Calendar;
|
||||||
|
@ -136,6 +140,14 @@ public class JobProviderIT extends MlSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUpdateCalendar() throws Exception {
|
public void testUpdateCalendar() throws Exception {
|
||||||
|
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
|
||||||
|
mlBuilder.putJob(createJob("foo").build(), false);
|
||||||
|
mlBuilder.putJob(createJob("bar").build(), false);
|
||||||
|
|
||||||
|
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
|
||||||
|
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, mlBuilder.build()))
|
||||||
|
.build();
|
||||||
|
|
||||||
String calendarId = "empty calendar";
|
String calendarId = "empty calendar";
|
||||||
Calendar emptyCal = new Calendar(calendarId, Collections.emptyList(), null);
|
Calendar emptyCal = new Calendar(calendarId, Collections.emptyList(), null);
|
||||||
indexCalendars(Collections.singletonList(emptyCal));
|
indexCalendars(Collections.singletonList(emptyCal));
|
||||||
|
@ -143,7 +155,7 @@ public class JobProviderIT extends MlSingleNodeTestCase {
|
||||||
Set<String> addedIds = new HashSet<>();
|
Set<String> addedIds = new HashSet<>();
|
||||||
addedIds.add("foo");
|
addedIds.add("foo");
|
||||||
addedIds.add("bar");
|
addedIds.add("bar");
|
||||||
updateCalendar(calendarId, addedIds, Collections.emptySet());
|
updateCalendar(calendarId, addedIds, Collections.emptySet(), clusterState);
|
||||||
|
|
||||||
Calendar updated = getCalendar(calendarId);
|
Calendar updated = getCalendar(calendarId);
|
||||||
assertEquals(calendarId, updated.getId());
|
assertEquals(calendarId, updated.getId());
|
||||||
|
@ -151,7 +163,7 @@ public class JobProviderIT extends MlSingleNodeTestCase {
|
||||||
|
|
||||||
Set<String> removedIds = new HashSet<>();
|
Set<String> removedIds = new HashSet<>();
|
||||||
removedIds.add("foo");
|
removedIds.add("foo");
|
||||||
updateCalendar(calendarId, Collections.emptySet(), removedIds);
|
updateCalendar(calendarId, Collections.emptySet(), removedIds, clusterState);
|
||||||
|
|
||||||
updated = getCalendar(calendarId);
|
updated = getCalendar(calendarId);
|
||||||
assertEquals(calendarId, updated.getId());
|
assertEquals(calendarId, updated.getId());
|
||||||
|
@ -240,10 +252,10 @@ public class JobProviderIT extends MlSingleNodeTestCase {
|
||||||
return result.get().results();
|
return result.get().results();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateCalendar(String calendarId, Set<String> idsToAdd, Set<String> idsToRemove) throws Exception {
|
private void updateCalendar(String calendarId, Set<String> idsToAdd, Set<String> idsToRemove, ClusterState clusterState) throws Exception {
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
|
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
|
||||||
jobProvider.updateCalendar(calendarId, idsToAdd, idsToRemove,
|
jobProvider.updateCalendar(calendarId, idsToAdd, idsToRemove, clusterState,
|
||||||
r -> latch.countDown(),
|
r -> latch.countDown(),
|
||||||
e -> {
|
e -> {
|
||||||
exceptionHolder.set(e);
|
exceptionHolder.set(e);
|
||||||
|
|
|
@ -252,10 +252,10 @@
|
||||||
job_id: "missing job"
|
job_id: "missing job"
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
catch: missing
|
catch: /Cannot remove \[missing_job\] as it is not present in calendar \[wildlife\]/
|
||||||
xpack.ml.delete_calendar_job:
|
xpack.ml.delete_calendar_job:
|
||||||
calendar_id: "wildlife"
|
calendar_id: "wildlife"
|
||||||
job_id: "missing job"
|
job_id: "missing_job"
|
||||||
|
|
||||||
---
|
---
|
||||||
"Test calendar get events":
|
"Test calendar get events":
|
||||||
|
@ -622,3 +622,12 @@
|
||||||
xpack.ml.delete_calendar_event:
|
xpack.ml.delete_calendar_event:
|
||||||
calendar_id: "unknown"
|
calendar_id: "unknown"
|
||||||
event_id: "event_1"
|
event_id: "event_1"
|
||||||
|
|
||||||
|
---
|
||||||
|
"Test delete job from non existing calendar":
|
||||||
|
|
||||||
|
- do:
|
||||||
|
catch: /No calendar with id \[unknown\]/
|
||||||
|
xpack.ml.delete_calendar_job:
|
||||||
|
calendar_id: "unknown"
|
||||||
|
job_id: "missing_job"
|
||||||
|
|
|
@ -27,6 +27,7 @@ integTestRunner {
|
||||||
'ml/calendar_crud/Test post calendar events given empty events',
|
'ml/calendar_crud/Test post calendar events given empty events',
|
||||||
'ml/calendar_crud/Test put calendar given id contains invalid chars',
|
'ml/calendar_crud/Test put calendar given id contains invalid chars',
|
||||||
'ml/calendar_crud/Test delete event from non existing calendar',
|
'ml/calendar_crud/Test delete event from non existing calendar',
|
||||||
|
'ml/calendar_crud/Test delete job from non existing calendar',
|
||||||
'ml/custom_all_field/Test querying custom all field',
|
'ml/custom_all_field/Test querying custom all field',
|
||||||
'ml/datafeeds_crud/Test delete datafeed with missing id',
|
'ml/datafeeds_crud/Test delete datafeed with missing id',
|
||||||
'ml/datafeeds_crud/Test put datafeed referring to missing job_id',
|
'ml/datafeeds_crud/Test put datafeed referring to missing job_id',
|
||||||
|
|
Loading…
Reference in New Issue