[ML] Account for job groups membership when getting events (elastic/x-pack-elasticsearch#3473)
* Account for job groups membership when getting events Original commit: elastic/x-pack-elasticsearch@a4185cc460
This commit is contained in:
parent
aa25704170
commit
66b187fe17
|
@ -20,6 +20,10 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A simple calendar object for scheduled (special) events.
|
||||
* The calendar consists of a name an a list of job Ids or job groups.
|
||||
*/
|
||||
public class Calendar implements ToXContentObject, Writeable {
|
||||
|
||||
public static final String CALENDAR_TYPE = "calendar";
|
||||
|
@ -49,6 +53,11 @@ public class Calendar implements ToXContentObject, Writeable {
|
|||
private final String id;
|
||||
private final List<String> jobIds;
|
||||
|
||||
/**
|
||||
* {@code jobIds} can be a mix of job groups and job Ids
|
||||
* @param id The calendar Id
|
||||
* @param jobIds List of job Ids or job groups.
|
||||
*/
|
||||
public Calendar(String id, List<String> jobIds) {
|
||||
this.id = Objects.requireNonNull(id, ID.getPreferredName() + " must not be null");
|
||||
this.jobIds = Objects.requireNonNull(jobIds, JOB_IDS.getPreferredName() + " must not be null");
|
||||
|
|
|
@ -12,10 +12,15 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|||
import org.elasticsearch.xpack.ml.action.util.PageParams;
|
||||
import org.elasticsearch.xpack.ml.calendars.Calendar;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class CalendarQueryBuilder {
|
||||
|
||||
private PageParams pageParams = new PageParams(0, 10000);
|
||||
private String jobId;
|
||||
private List<String> jobGroups = Collections.emptyList();
|
||||
private boolean sort = false;
|
||||
|
||||
/**
|
||||
|
@ -38,6 +43,11 @@ public class CalendarQueryBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
public CalendarQueryBuilder jobGroups(List<String> jobGroups) {
|
||||
this.jobGroups = jobGroups;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort results by calendar_id
|
||||
* @param sort Sort if true
|
||||
|
@ -50,10 +60,15 @@ public class CalendarQueryBuilder {
|
|||
|
||||
public SearchSourceBuilder build() {
|
||||
QueryBuilder qb;
|
||||
List<String> jobIdAndGroups = new ArrayList<>(jobGroups);
|
||||
if (jobId != null) {
|
||||
jobIdAndGroups.add(jobId);
|
||||
}
|
||||
|
||||
if (jobIdAndGroups.isEmpty() == false) {
|
||||
qb = new BoolQueryBuilder()
|
||||
.filter(new TermsQueryBuilder(Calendar.TYPE.getPreferredName(), Calendar.CALENDAR_TYPE))
|
||||
.filter(new TermsQueryBuilder(Calendar.JOB_IDS.getPreferredName(), jobId));
|
||||
.filter(new TermsQueryBuilder(Calendar.JOB_IDS.getPreferredName(), jobIdAndGroups));
|
||||
} else {
|
||||
qb = new TermsQueryBuilder(Calendar.TYPE.getPreferredName(), Calendar.CALENDAR_TYPE);
|
||||
}
|
||||
|
|
|
@ -373,7 +373,7 @@ public class JobProvider {
|
|||
if (lastestRecordTime != null) {
|
||||
scheduledEventsQueryBuilder.after(Long.toString(lastestRecordTime.getTime()));
|
||||
}
|
||||
scheduledEventsForJob(jobId, scheduledEventsQueryBuilder, ActionListener.wrap(
|
||||
scheduledEventsForJob(jobId, job.getGroups(), scheduledEventsQueryBuilder, ActionListener.wrap(
|
||||
events -> {
|
||||
paramsBuilder.setScheduledEvents(events.results());
|
||||
consumer.accept(paramsBuilder.build());
|
||||
|
@ -1029,9 +1029,8 @@ public class JobProvider {
|
|||
});
|
||||
}
|
||||
|
||||
public void scheduledEventsForJob(String jobId, ScheduledEventsQueryBuilder queryBuilder,
|
||||
public void scheduledEventsForJob(String jobId, List<String> jobGroups, ScheduledEventsQueryBuilder queryBuilder,
|
||||
ActionListener<QueryPage<ScheduledEvent>> handler) {
|
||||
|
||||
// Find all the calendars used by the job then the events for those calendars
|
||||
|
||||
ActionListener<QueryPage<Calendar>> calendarsListener = ActionListener.wrap(
|
||||
|
@ -1047,7 +1046,7 @@ public class JobProvider {
|
|||
handler::onFailure
|
||||
);
|
||||
|
||||
CalendarQueryBuilder query = new CalendarQueryBuilder().jobId(jobId);
|
||||
CalendarQueryBuilder query = new CalendarQueryBuilder().jobId(jobId).jobGroups(jobGroups);
|
||||
calendars(query, calendarsListener);
|
||||
}
|
||||
|
||||
|
|
|
@ -8,31 +8,40 @@ package org.elasticsearch.xpack.ml.action;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ml.MLMetadataField;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.ml.calendars.ScheduledEvent;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class TransportGetCalendarEventsAction extends HandledTransportAction<GetCalendarEventsAction.Request,
|
||||
GetCalendarEventsAction.Response> {
|
||||
|
||||
private final JobProvider jobProvider;
|
||||
private final ClusterService clusterService;
|
||||
|
||||
@Inject
|
||||
public TransportGetCalendarEventsAction(Settings settings, ThreadPool threadPool,
|
||||
TransportService transportService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
JobProvider jobProvider) {
|
||||
ClusterService clusterService, JobProvider jobProvider) {
|
||||
super(settings, GetCalendarEventsAction.NAME, threadPool, transportService, actionFilters,
|
||||
indexNameExpressionResolver, GetCalendarEventsAction.Request::new);
|
||||
this.jobProvider = jobProvider;
|
||||
this.clusterService = clusterService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -58,7 +67,26 @@ public class TransportGetCalendarEventsAction extends HandledTransportAction<Get
|
|||
);
|
||||
|
||||
if (request.getJobId() != null) {
|
||||
jobProvider.scheduledEventsForJob(request.getJobId(), query, eventsListener);
|
||||
ClusterState state = clusterService.state();
|
||||
MlMetadata currentMlMetadata = state.metaData().custom(MLMetadataField.TYPE);
|
||||
|
||||
List<String> jobGroups;
|
||||
String requestId = request.getJobId();
|
||||
|
||||
Job job = currentMlMetadata.getJobs().get(request.getJobId());
|
||||
if (job == null) {
|
||||
// Check if the requested id is a job group
|
||||
if (currentMlMetadata.isGroupOrJob(request.getJobId()) == false) {
|
||||
listener.onFailure(ExceptionsHelper.missingJobException(request.getJobId()));
|
||||
return;
|
||||
}
|
||||
jobGroups = Collections.singletonList(request.getJobId());
|
||||
requestId = null;
|
||||
} else {
|
||||
jobGroups = job.getGroups();
|
||||
}
|
||||
|
||||
jobProvider.scheduledEventsForJob(requestId, jobGroups, query, eventsListener);
|
||||
} else {
|
||||
jobProvider.scheduledEvents(query, eventsListener);
|
||||
}
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
|||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN;
|
||||
|
@ -59,7 +58,9 @@ public class TransportPutCalendarAction extends HandledTransportAction<PutCalend
|
|||
protected void doExecute(PutCalendarAction.Request request, ActionListener<PutCalendarAction.Response> listener) {
|
||||
Calendar calendar = request.getCalendar();
|
||||
|
||||
checkJobsExist(calendar.getJobIds(), listener::onFailure);
|
||||
if (checkJobsOrGroupsExist(calendar.getJobIds(), listener::onFailure) == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, calendar.documentId());
|
||||
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
|
||||
|
@ -88,15 +89,16 @@ public class TransportPutCalendarAction extends HandledTransportAction<PutCalend
|
|||
});
|
||||
}
|
||||
|
||||
private void checkJobsExist(List<String> jobIds, Consumer<Exception> errorHandler) {
|
||||
private boolean checkJobsOrGroupsExist(List<String> jobIds, Consumer<Exception> errorHandler) {
|
||||
ClusterState state = clusterService.state();
|
||||
MlMetadata mlMetadata = state.getMetaData().custom(MLMetadataField.TYPE);
|
||||
for (String jobId: jobIds) {
|
||||
Set<String> jobs = mlMetadata.expandJobIds(jobId, true);
|
||||
if (jobs.isEmpty()) {
|
||||
if (mlMetadata.isGroupOrJob(jobId) == false) {
|
||||
errorHandler.accept(ExceptionsHelper.missingJobException(jobId));
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -274,10 +274,11 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
handler::accept);
|
||||
|
||||
if (updateParams.isUpdateScheduledEvents()) {
|
||||
Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId());
|
||||
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().after(Long.toString(new Date().getTime()));
|
||||
jobProvider.scheduledEventsForJob(jobTask.getJobId(), query, eventsListener);
|
||||
jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
|
||||
} else {
|
||||
eventsListener.onResponse(new QueryPage<ScheduledEvent>(Collections.emptyList(), 0, ScheduledEvent.RESULTS_FIELD));
|
||||
eventsListener.onResponse(new QueryPage<>(Collections.emptyList(), 0, ScheduledEvent.RESULTS_FIELD));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -306,18 +306,18 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
|
|||
indexScheduledEvents(events);
|
||||
|
||||
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder();
|
||||
List<ScheduledEvent> returnedEvents = getScheduledEventsForJob(jobA.getId(), query);
|
||||
List<ScheduledEvent> returnedEvents = getScheduledEventsForJob(jobA.getId(), Collections.emptyList(), query);
|
||||
assertEquals(4, returnedEvents.size());
|
||||
assertEquals(events.get(0), returnedEvents.get(0));
|
||||
assertEquals(events.get(1), returnedEvents.get(1));
|
||||
assertEquals(events.get(3), returnedEvents.get(2));
|
||||
assertEquals(events.get(2), returnedEvents.get(3));
|
||||
|
||||
returnedEvents = getScheduledEventsForJob(jobB.getId(), query);
|
||||
returnedEvents = getScheduledEventsForJob(jobB.getId(), Collections.singletonList("unrelated-job-group"), query);
|
||||
assertEquals(1, returnedEvents.size());
|
||||
assertEquals(events.get(3), returnedEvents.get(0));
|
||||
|
||||
returnedEvents = getScheduledEventsForJob(jobC.getId(), query);
|
||||
returnedEvents = getScheduledEventsForJob(jobC.getId(), Collections.emptyList(), query);
|
||||
assertEquals(0, returnedEvents.size());
|
||||
|
||||
// Test time filters
|
||||
|
@ -325,12 +325,44 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
|
|||
query.after(Long.toString(now.plusDays(8).plusHours(1).toInstant().toEpochMilli()));
|
||||
// Lands halfway through the 3rd event which should be returned
|
||||
query.before(Long.toString(now.plusDays(12).plusHours(1).toInstant().toEpochMilli()));
|
||||
returnedEvents = getScheduledEventsForJob(jobA.getId(), query);
|
||||
returnedEvents = getScheduledEventsForJob(jobA.getId(), Collections.emptyList(), query);
|
||||
assertEquals(2, returnedEvents.size());
|
||||
assertEquals(events.get(1), returnedEvents.get(0));
|
||||
assertEquals(events.get(3), returnedEvents.get(1));
|
||||
}
|
||||
|
||||
public void testScheduledEventsForJob_withGroup() throws Exception {
|
||||
String groupA = "group-a";
|
||||
String groupB = "group-b";
|
||||
createJob("job-in-group-a", Collections.emptyList(), Collections.singletonList(groupA));
|
||||
createJob("job-in-group-a-and-b", Collections.emptyList(), Arrays.asList(groupA, groupB));
|
||||
|
||||
String calendarAId = "calendar_a";
|
||||
List<Calendar> calendars = new ArrayList<>();
|
||||
calendars.add(new Calendar(calendarAId, Collections.singletonList(groupA)));
|
||||
|
||||
ZonedDateTime now = ZonedDateTime.now();
|
||||
List<ScheduledEvent> events = new ArrayList<>();
|
||||
events.add(buildScheduledEvent("downtime_A", now.plusDays(1), now.plusDays(2), calendarAId));
|
||||
|
||||
String calendarBId = "calendar_b";
|
||||
calendars.add(new Calendar(calendarBId, Arrays.asList(groupB)));
|
||||
events.add(buildScheduledEvent("downtime_B", now.plusDays(12), now.plusDays(13), calendarBId));
|
||||
|
||||
indexCalendars(calendars);
|
||||
indexScheduledEvents(events);
|
||||
|
||||
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder();
|
||||
List<ScheduledEvent> returnedEvents = getScheduledEventsForJob("job-in-group-a", Collections.singletonList(groupA), query);
|
||||
assertEquals(1, returnedEvents.size());
|
||||
assertEquals(events.get(0), returnedEvents.get(0));
|
||||
|
||||
query = new ScheduledEventsQueryBuilder();
|
||||
returnedEvents = getScheduledEventsForJob("job-in-group-a-and-b", Collections.singletonList(groupB), query);
|
||||
assertEquals(1, returnedEvents.size());
|
||||
assertEquals(events.get(1), returnedEvents.get(0));
|
||||
}
|
||||
|
||||
private ScheduledEvent buildScheduledEvent(String description, ZonedDateTime start, ZonedDateTime end, String calendarId) {
|
||||
return new ScheduledEvent.Builder().description(description).startTime(start).endTime(end).calendarId(calendarId).build();
|
||||
}
|
||||
|
@ -432,11 +464,12 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
|
|||
return searchResultHolder.get();
|
||||
}
|
||||
|
||||
private List<ScheduledEvent> getScheduledEventsForJob(String jobId, ScheduledEventsQueryBuilder query) throws Exception {
|
||||
private List<ScheduledEvent> getScheduledEventsForJob(String jobId, List<String> jobGroups, ScheduledEventsQueryBuilder query)
|
||||
throws Exception {
|
||||
AtomicReference<Exception> errorHolder = new AtomicReference<>();
|
||||
AtomicReference<QueryPage<ScheduledEvent>> searchResultHolder = new AtomicReference<>();
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
jobProvider.scheduledEventsForJob(jobId, query, ActionListener.wrap(
|
||||
jobProvider.scheduledEventsForJob(jobId, jobGroups, query, ActionListener.wrap(
|
||||
params -> {
|
||||
searchResultHolder.set(params);
|
||||
latch.countDown();
|
||||
|
@ -458,7 +491,12 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
|
|||
}
|
||||
|
||||
private Job.Builder createJob(String jobId, List<String> filterIds) {
|
||||
return createJob(jobId, filterIds, Collections.emptyList());
|
||||
}
|
||||
|
||||
private Job.Builder createJob(String jobId, List<String> filterIds, List<String> jobGroups) {
|
||||
Job.Builder builder = new Job.Builder(jobId);
|
||||
builder.setGroups(jobGroups);
|
||||
AnalysisConfig.Builder ac = createAnalysisConfig(filterIds);
|
||||
DataDescription.Builder dc = new DataDescription.Builder();
|
||||
builder.setAnalysisConfig(ac);
|
||||
|
|
|
@ -503,3 +503,52 @@
|
|||
- length: { events: 2 }
|
||||
- match: { events.0.description: ny }
|
||||
- match: { events.1.description: other }
|
||||
|
||||
---
|
||||
"Test get calendar events with job groups":
|
||||
# Test job group
|
||||
- do:
|
||||
xpack.ml.put_job:
|
||||
job_id: cal-crud-job-with-events-group
|
||||
body: >
|
||||
{
|
||||
"analysis_config" : {
|
||||
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
|
||||
},
|
||||
"data_description" : {
|
||||
},
|
||||
"groups" : ["ben-holidays-group"]
|
||||
}
|
||||
|
||||
- do:
|
||||
xpack.ml.put_calendar:
|
||||
calendar_id: "ben-holidays"
|
||||
body: >
|
||||
{
|
||||
"job_ids": ["ben-holidays-group"]
|
||||
}
|
||||
|
||||
- do:
|
||||
xpack.ml.post_calendar_events:
|
||||
calendar_id: "ben-holidays"
|
||||
body: >
|
||||
{ "description": "ski", "start_time": "2018-01-20T00:00:00Z", "end_time": "2018-01-27T00:00:00Z" }
|
||||
{ "description": "snow", "start_time": "2018-01-30T00:00:00Z", "end_time": "2018-02-01T00:00:00Z" }
|
||||
|
||||
- do:
|
||||
xpack.ml.get_calendar_events:
|
||||
calendar_id: _all
|
||||
job_id: "cal-crud-job-with-events-group"
|
||||
- match: { count: 2 }
|
||||
- length: { events: 2 }
|
||||
- match: { events.0.description: ski }
|
||||
- match: { events.1.description: snow }
|
||||
|
||||
- do:
|
||||
xpack.ml.get_calendar_events:
|
||||
calendar_id: _all
|
||||
job_id: "ben-holidays-group"
|
||||
- match: { count: 2 }
|
||||
- length: { events: 2 }
|
||||
- match: { events.0.description: ski }
|
||||
- match: { events.1.description: snow }
|
||||
|
|
Loading…
Reference in New Issue