[ML] Enable adding multiple jobs to a calendar (elastic/x-pack-elasticsearch#3786)

Original commit: elastic/x-pack-elasticsearch@56a70a4580
This commit is contained in:
David Kyle 2018-02-08 11:44:16 +00:00 committed by GitHub
parent a57999e1e1
commit 8e73085047
8 changed files with 125 additions and 52 deletions

View File

@ -19,7 +19,8 @@ This API enables you to delete jobs from a calendar.
(string) Identifier for the calendar.
`job_id` (required)::
(string) Identifier for the job.
(string) An identifier for the job. It can be a job identifier, a group name, or a
comma-separated list of jobs or groups.
==== Authorization

View File

@ -18,8 +18,8 @@ This API enables you to add a job to a calendar.
(string) Identifier for the calendar.
`job_id` (required)::
(string) Identifier for the job.
(string) An identifier for the job. It can be a job identifier, a group name, or a
comma-separated list of jobs or groups.
==== Authorization

View File

@ -16,7 +16,6 @@ import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Objects;
public class UpdateCalendarJobAction extends Action<UpdateCalendarJobAction.Request, PutCalendarAction.Response,
@ -41,28 +40,32 @@ public class UpdateCalendarJobAction extends Action<UpdateCalendarJobAction.Requ
public static class Request extends ActionRequest {
private String calendarId;
private String jobIdToAdd;
private String jobIdToRemove;
private String jobIdsToAddExpression;
private String jobIdsToRemoveExpression;
public Request() {
}
public Request(String calendarId, String jobIdToAdd, String jobIdToRemove) {
/**
* Job id expressions may be a single job, job group or comma separated
* list of job Ids or groups
*/
public Request(String calendarId, String jobIdsToAddExpression, String jobIdsToRemoveExpression) {
this.calendarId = ExceptionsHelper.requireNonNull(calendarId, Calendar.ID.getPreferredName());
this.jobIdToAdd = jobIdToAdd;
this.jobIdToRemove = jobIdToRemove;
this.jobIdsToAddExpression = jobIdsToAddExpression;
this.jobIdsToRemoveExpression = jobIdsToRemoveExpression;
}
public String getCalendarId() {
return calendarId;
}
public String getJobIdToAdd() {
return jobIdToAdd;
public String getJobIdsToAddExpression() {
return jobIdsToAddExpression;
}
public String getJobIdToRemove() {
return jobIdToRemove;
public String getJobIdsToRemoveExpression() {
return jobIdsToRemoveExpression;
}
@Override
@ -74,21 +77,21 @@ public class UpdateCalendarJobAction extends Action<UpdateCalendarJobAction.Requ
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
calendarId = in.readString();
jobIdToAdd = in.readOptionalString();
jobIdToRemove = in.readOptionalString();
jobIdsToAddExpression = in.readOptionalString();
jobIdsToRemoveExpression = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(calendarId);
out.writeOptionalString(jobIdToAdd);
out.writeOptionalString(jobIdToRemove);
out.writeOptionalString(jobIdsToAddExpression);
out.writeOptionalString(jobIdsToRemoveExpression);
}
@Override
public int hashCode() {
return Objects.hash(calendarId, jobIdToAdd, jobIdToRemove);
return Objects.hash(calendarId, jobIdsToAddExpression, jobIdsToRemoveExpression);
}
@Override
@ -100,8 +103,8 @@ public class UpdateCalendarJobAction extends Action<UpdateCalendarJobAction.Requ
return false;
}
Request other = (Request) obj;
return Objects.equals(calendarId, other.calendarId) && Objects.equals(jobIdToAdd, other.jobIdToAdd)
&& Objects.equals(jobIdToRemove, other.jobIdToRemove);
return Objects.equals(calendarId, other.calendarId) && Objects.equals(jobIdsToAddExpression, other.jobIdsToAddExpression)
&& Objects.equals(jobIdsToRemoveExpression, other.jobIdsToRemoveExpression);
}
}

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
public class UpdateCalendarJobActionResquestTests extends AbstractStreamableTestCase<UpdateCalendarJobAction.Request> {
@Override

View File

@ -8,12 +8,16 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PutCalendarAction;
import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction;
import org.elasticsearch.xpack.ml.job.JobManager;
@ -42,17 +46,16 @@ public class TransportUpdateCalendarJobAction extends HandledTransportAction<Upd
@Override
protected void doExecute(UpdateCalendarJobAction.Request request, ActionListener<PutCalendarAction.Response> listener) {
Set<String> jobIdsToAdd = new HashSet<>();
if (request.getJobIdToAdd() != null && request.getJobIdToAdd().isEmpty() == false) {
jobIdsToAdd.add(request.getJobIdToAdd());
}
Set<String> jobIdsToRemove = new HashSet<>();
if (request.getJobIdToRemove() != null && request.getJobIdToRemove().isEmpty() == false) {
jobIdsToRemove.add(request.getJobIdToRemove());
ClusterState clusterState = clusterService.state();
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
if (mlMetadata == null) {
mlMetadata = MlMetadata.EMPTY_METADATA;
}
jobProvider.updateCalendar(request.getCalendarId(), jobIdsToAdd, jobIdsToRemove, clusterService.state(),
Set<String> jobIdsToAdd = Strings.tokenizeByCommaToSet(request.getJobIdsToAddExpression());
Set<String> jobIdsToRemove = Strings.tokenizeByCommaToSet(request.getJobIdsToRemoveExpression());
jobProvider.updateCalendar(request.getCalendarId(), jobIdsToAdd, jobIdsToRemove, mlMetadata,
c -> {
jobManager.updateProcessOnCalendarChanged(c.getJobIds());
listener.onResponse(new PutCalendarAction.Response(c));

View File

@ -67,7 +67,6 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
@ -1082,7 +1081,7 @@ public class JobProvider {
result -> handler.accept(result.result), errorHandler, () -> null);
}
public void updateCalendar(String calendarId, Set<String> jobIdsToAdd, Set<String> jobIdsToRemove, ClusterState clusterState,
public void updateCalendar(String calendarId, Set<String> jobIdsToAdd, Set<String> jobIdsToRemove, MlMetadata mlMetadata,
Consumer<Calendar> handler, Consumer<Exception> errorHandler) {
ActionListener<Calendar> getCalendarListener = ActionListener.wrap(
@ -1090,7 +1089,6 @@ public class JobProvider {
Set<String> currentJobs = new HashSet<>(calendar.getJobIds());
for (String jobToAdd : jobIdsToAdd) {
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
if (mlMetadata.isGroupOrJob(jobToAdd) == false) {
errorHandler.accept(ExceptionsHelper.missingJobException(jobToAdd));
return;

View File

@ -10,9 +10,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
@ -22,7 +20,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
@ -67,7 +64,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
@ -129,7 +125,7 @@ public class JobProviderIT extends MlSingleNodeTestCase {
assertThat(queryResult, hasSize(3));
Long matchedCount = queryResult.stream().filter(
c -> c.getId().equals("foo calendar") || c.getId().equals("foo bar calendar") || c.getId().equals("cat foo calendar"))
.collect(Collectors.counting());
.count();
assertEquals(new Long(3), matchedCount);
queryResult = getCalendars("bar");
@ -142,10 +138,6 @@ public class JobProviderIT extends MlSingleNodeTestCase {
mlBuilder.putJob(createJob("foo").build(), false);
mlBuilder.putJob(createJob("bar").build(), false);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, mlBuilder.build()))
.build();
String calendarId = "empty calendar";
Calendar emptyCal = new Calendar(calendarId, Collections.emptyList(), null);
indexCalendars(Collections.singletonList(emptyCal));
@ -153,15 +145,13 @@ public class JobProviderIT extends MlSingleNodeTestCase {
Set<String> addedIds = new HashSet<>();
addedIds.add("foo");
addedIds.add("bar");
updateCalendar(calendarId, addedIds, Collections.emptySet(), clusterState);
updateCalendar(calendarId, addedIds, Collections.emptySet(), mlBuilder.build());
Calendar updated = getCalendar(calendarId);
assertEquals(calendarId, updated.getId());
assertEquals(addedIds, new HashSet<>(updated.getJobIds()));
Set<String> removedIds = new HashSet<>();
removedIds.add("foo");
updateCalendar(calendarId, Collections.emptySet(), removedIds, clusterState);
updateCalendar(calendarId, Collections.emptySet(), Collections.singleton("foo"), mlBuilder.build());
updated = getCalendar(calendarId);
assertEquals(calendarId, updated.getId());
@ -250,11 +240,11 @@ public class JobProviderIT extends MlSingleNodeTestCase {
return result.get().results();
}
private void updateCalendar(String calendarId, Set<String> idsToAdd, Set<String> idsToRemove, ClusterState clusterState)
private void updateCalendar(String calendarId, Set<String> idsToAdd, Set<String> idsToRemove, MlMetadata mlMetadata)
throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
jobProvider.updateCalendar(calendarId, idsToAdd, idsToRemove, clusterState,
jobProvider.updateCalendar(calendarId, idsToAdd, idsToRemove, mlMetadata,
r -> latch.countDown(),
e -> {
exceptionHolder.set(e);
@ -357,7 +347,7 @@ public class JobProviderIT extends MlSingleNodeTestCase {
events.add(buildScheduledEvent("downtime_A", now.plusDays(1), now.plusDays(2), calendarAId));
String calendarBId = "calendar_b";
calendars.add(new Calendar(calendarBId, Arrays.asList(groupB), null));
calendars.add(new Calendar(calendarBId, Collections.singletonList(groupB), null));
events.add(buildScheduledEvent("downtime_B", now.plusDays(12), now.plusDays(13), calendarBId));
indexCalendars(calendars);

View File

@ -218,6 +218,19 @@
}
- match: { job_id: "tiger" }
- do:
xpack.ml.put_job:
job_id: otter
body: >
{
"analysis_config" : {
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}
- match: { job_id: "otter" }
- do:
xpack.ml.put_calendar_job:
calendar_id: "wildlife"
@ -240,6 +253,12 @@
- match: { calendar_id: "wildlife" }
- length: { job_ids: 0 }
- do:
catch: /Cannot remove \[otter\] as it is not present in calendar \[wildlife\]/
xpack.ml.delete_calendar_job:
calendar_id: "wildlife"
job_id: "otter"
- do:
xpack.ml.get_calendars:
calendar_id: "wildlife"
@ -251,14 +270,13 @@
catch: missing
xpack.ml.put_calendar_job:
calendar_id: "wildlife"
job_id: "missing job"
job_id: "missing_job"
- do:
catch: /Cannot remove \[missing_job\] as it is not present in calendar \[wildlife\]/
xpack.ml.delete_calendar_job:
calendar_id: "wildlife"
job_id: "missing_job"
---
"Test calendar get events":
@ -632,4 +650,65 @@
catch: /No calendar with id \[unknown\]/
xpack.ml.delete_calendar_job:
calendar_id: "unknown"
job_id: "missing_job"
job_id: "missing_calendar"
---
"Test list of job Ids":
- do:
xpack.ml.put_job:
job_id: foo-a
body: >
{
"analysis_config" : {
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}
- do:
xpack.ml.put_job:
job_id: foo-b
body: >
{
"analysis_config" : {
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}
- do:
xpack.ml.put_job:
job_id: bar-a
body: >
{
"analysis_config" : {
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}
- do:
xpack.ml.put_calendar:
calendar_id: "expression"
body: >
{
"job_ids": ["bar-a"]
}
- do:
xpack.ml.put_calendar_job:
calendar_id: "expression"
job_id: "foo-a,foo-b"
- match: { calendar_id: "expression" }
- length: { job_ids: 3 }
- do:
xpack.ml.delete_calendar_job:
calendar_id: "expression"
job_id: "foo-a,foo-b"
- match: { calendar_id: "expression" }
- length: { job_ids: 1 }
- match: { job_ids.0: "bar-a" }