[ML] Expand job groups on scheduled events update (elastic/x-pack-elasticsearch#3577)
Relates elastic/x-pack-elasticsearch#3016 Original commit: elastic/x-pack-elasticsearch@73334d8e01
This commit is contained in:
parent
0b8723015f
commit
368c4fff56
|
@ -52,6 +52,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
@ -132,6 +133,14 @@ public class JobManager extends AbstractComponent {
|
|||
return job;
|
||||
}
|
||||
|
||||
private Set<String> expandJobIds(String expression, boolean allowNoJobs, ClusterState clusterState) {
|
||||
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
|
||||
if (mlMetadata == null) {
|
||||
mlMetadata = MlMetadata.EMPTY_METADATA;
|
||||
}
|
||||
return mlMetadata.expandJobIds(expression, allowNoJobs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the jobs that match the given {@code expression}.
|
||||
* Note that when the {@code jobId} is {@link MetaData#ALL} all jobs are returned.
|
||||
|
@ -142,11 +151,8 @@ public class JobManager extends AbstractComponent {
|
|||
* @return A {@link QueryPage} containing the matching {@code Job}s
|
||||
*/
|
||||
public QueryPage<Job> expandJobs(String expression, boolean allowNoJobs, ClusterState clusterState) {
|
||||
Set<String> expandedJobIds = expandJobIds(expression, allowNoJobs, clusterState);
|
||||
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
|
||||
if (mlMetadata == null) {
|
||||
mlMetadata = MlMetadata.EMPTY_METADATA;
|
||||
}
|
||||
Set<String> expandedJobIds = mlMetadata.expandJobIds(expression, allowNoJobs);
|
||||
List<Job> jobs = new ArrayList<>();
|
||||
for (String expandedJobId : expandedJobIds) {
|
||||
jobs.add(mlMetadata.getJobs().get(expandedJobId));
|
||||
|
@ -335,7 +341,9 @@ public class JobManager extends AbstractComponent {
|
|||
|
||||
public void updateProcessOnCalendarChanged(List<String> calendarJobIds) {
|
||||
ClusterState clusterState = clusterService.state();
|
||||
for (String jobId : calendarJobIds) {
|
||||
Set<String> expandedJobIds = new HashSet<>();
|
||||
calendarJobIds.stream().forEach(jobId -> expandedJobIds.addAll(expandJobIds(jobId, true, clusterState)));
|
||||
for (String jobId : expandedJobIds) {
|
||||
if (isJobOpen(clusterState, jobId)) {
|
||||
updateJobProcessNotifier.submitJobUpdate(UpdateParams.scheduledEventsUpdate(jobId));
|
||||
}
|
||||
|
|
|
@ -266,6 +266,45 @@ public class JobManagerTests extends ESTestCase {
|
|||
assertThat(capturedUpdateParams.get(1).isUpdateScheduledEvents(), is(true));
|
||||
}
|
||||
|
||||
public void testUpdateProcessOnCalendarChanged_GivenGroups() {
|
||||
Job.Builder job1 = buildJobBuilder("job-1");
|
||||
job1.setGroups(Collections.singletonList("group-1"));
|
||||
Job.Builder job2 = buildJobBuilder("job-2");
|
||||
job2.setGroups(Collections.singletonList("group-1"));
|
||||
Job.Builder job3 = buildJobBuilder("job-3");
|
||||
|
||||
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
|
||||
mlMetadata.putJob(job1.build(), false);
|
||||
mlMetadata.putJob(job2.build(), false);
|
||||
mlMetadata.putJob(job3.build(), false);
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job1.getId(), "node_id", JobState.OPENED, tasksBuilder);
|
||||
addJobTask(job2.getId(), "node_id", JobState.OPENED, tasksBuilder);
|
||||
addJobTask(job3.getId(), "node_id", JobState.OPENED, tasksBuilder);
|
||||
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder()
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())
|
||||
.putCustom(MLMetadataField.TYPE, mlMetadata.build()))
|
||||
.build();
|
||||
when(clusterService.state()).thenReturn(clusterState);
|
||||
|
||||
JobManager jobManager = createJobManager();
|
||||
|
||||
jobManager.updateProcessOnCalendarChanged(Collections.singletonList("group-1"));
|
||||
|
||||
ArgumentCaptor<UpdateParams> updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class);
|
||||
verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture());
|
||||
|
||||
List<UpdateParams> capturedUpdateParams = updateParamsCaptor.getAllValues();
|
||||
assertThat(capturedUpdateParams.size(), equalTo(2));
|
||||
assertThat(capturedUpdateParams.get(0).getJobId(), equalTo(job1.getId()));
|
||||
assertThat(capturedUpdateParams.get(0).isUpdateScheduledEvents(), is(true));
|
||||
assertThat(capturedUpdateParams.get(1).getJobId(), equalTo(job2.getId()));
|
||||
assertThat(capturedUpdateParams.get(1).isUpdateScheduledEvents(), is(true));
|
||||
}
|
||||
|
||||
private Job.Builder createJob() {
|
||||
Detector.Builder d1 = new Detector.Builder("info_content", "domain");
|
||||
d1.setOverFieldName("client");
|
||||
|
|
Loading…
Reference in New Issue