fixed scheduler job integration tests

Original commit: elastic/x-pack-elasticsearch@bb522d9d6d
This commit is contained in:
Martijn van Groningen 2016-12-27 22:54:07 +01:00
parent b94b79a411
commit ec8fb6c99f
5 changed files with 23 additions and 19 deletions

View File

@ -125,7 +125,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
public static final String NAME = "prelert";
public static final String BASE_PATH = "/_xpack/prelert/";
public static final String THREAD_POOL_NAME = NAME;
public static final String SCHEDULER_THREAD_POOL_NAME = NAME + "_scheduler";
public static final String SCHEDULED_RUNNER_THREAD_POOL_NAME = NAME + "_scheduled_runner";
public static final String AUTODETECT_PROCESS_THREAD_POOL_NAME = NAME + "_autodetect_process";
// NORELEASE - temporary solution
@ -313,7 +313,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
// TODO: if scheduled and non scheduled jobs are considered more equal and the scheduler and
// autodetect process are created at the same time then these two different TPs can merge.
FixedExecutorBuilder scheduler = new FixedExecutorBuilder(settings, SCHEDULER_THREAD_POOL_NAME,
FixedExecutorBuilder scheduler = new FixedExecutorBuilder(settings, SCHEDULED_RUNNER_THREAD_POOL_NAME,
maxNumberOfJobs, 1, "xpack.prelert.scheduler_thread_pool");
return Arrays.asList(prelert, autoDetect, scheduler);
}

View File

@ -198,7 +198,7 @@ public class StartSchedulerAction
/* public for testing */
public void stop() {
if (holder != null) {
holder.stop();
holder.stop(null);
}
}
}

View File

@ -66,18 +66,23 @@ public class ScheduledJobRunner extends AbstractComponent {
validate(schedulerId, prelertMetadata);
setJobSchedulerStatus(schedulerId, SchedulerStatus.STARTED, error -> {
if (error != null) {
handler.accept(error);
return;
}
Scheduler scheduler = prelertMetadata.getScheduler(schedulerId);
logger.info("Starting scheduler [{}] for job [{}]", schedulerId, scheduler.getJobId());
Job job = prelertMetadata.getJobs().get(scheduler.getJobId());
Holder holder = createJobScheduler(scheduler, job, handler);
task.setHolder(holder);
holder.future = threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME).submit(() -> {
holder.future = threadPool.executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME).submit(() -> {
try {
Long next = holder.scheduledJob.runLookBack(startTime, endTime);
if (next != null) {
doScheduleRealtime(next, job.getId(), holder);
} else {
holder.stop();
holder.stop(null);
}
} catch (ScheduledJob.ExtractionProblemException e) {
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
@ -85,11 +90,11 @@ public class ScheduledJobRunner extends AbstractComponent {
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (ScheduledJob.EmptyDataCountException e) {
if (holder.problemTracker.updateEmptyDataCount(true)) {
holder.stop();
holder.stop(e);
}
} catch (Exception e) {
logger.error("Failed lookback import for job [" + job.getId() + "]", e);
holder.stop();
holder.stop(e);
}
holder.problemTracker.finishReport();
});
@ -100,7 +105,7 @@ public class ScheduledJobRunner extends AbstractComponent {
if (holder.isRunning()) {
TimeValue delay = computeNextDelay(delayInMsSinceEpoch);
logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId);
holder.future = threadPool.schedule(delay, PrelertPlugin.SCHEDULER_THREAD_POOL_NAME, () -> {
holder.future = threadPool.schedule(delay, PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME, () -> {
long nextDelayInMsSinceEpoch;
try {
nextDelayInMsSinceEpoch = holder.scheduledJob.runRealtime();
@ -114,19 +119,19 @@ public class ScheduledJobRunner extends AbstractComponent {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
if (holder.problemTracker.updateEmptyDataCount(true)) {
holder.problemTracker.finishReport();
holder.stop();
holder.stop(e);
return;
}
} catch (Exception e) {
logger.error("Unexpected scheduler failure for job [" + jobId + "] stopping...", e);
holder.stop();
holder.stop(e);
return;
}
holder.problemTracker.finishReport();
doScheduleRealtime(nextDelayInMsSinceEpoch, jobId, holder);
});
} else {
holder.stop();
holder.stop(null);
}
}
@ -245,11 +250,11 @@ public class ScheduledJobRunner extends AbstractComponent {
return scheduledJob.isRunning();
}
public void stop() {
public void stop(Exception e) {
logger.info("Stopping scheduler [{}] for job [{}]", scheduler.getId(), scheduler.getJobId());
scheduledJob.stop();
FutureUtils.cancel(future);
setJobSchedulerStatus(scheduler.getId(), SchedulerStatus.STOPPED, error -> handler.accept(null));
setJobSchedulerStatus(scheduler.getId(), SchedulerStatus.STOPPED, error -> handler.accept(e));
}
}

View File

@ -32,7 +32,6 @@ import static org.mockito.Mockito.mock;
public class JobResultsPersisterTests extends ESTestCase {
private static final String CLUSTER_NAME = "myCluster";
private static final String JOB_ID = "foo";
public void testPersistBucket_OneRecord() throws IOException {
@ -168,7 +167,7 @@ public class JobResultsPersisterTests extends ESTestCase {
private Client mockClient(AtomicReference reference) {
Client client = mock(Client.class);
doAnswer(invocationOnMock -> {
reference.set((BulkRequest) invocationOnMock.getArguments()[1]);
reference.set(invocationOnMock.getArguments()[1]);
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L));
return null;

View File

@ -96,7 +96,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).submit(any(Runnable.class));
when(threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME)).thenReturn(executorService);
when(threadPool.executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME)).thenReturn(executorService);
when(client.execute(same(JobDataAction.INSTANCE), any())).thenReturn(jobDataFuture);
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
@ -133,7 +133,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
scheduledJobRunner.run("scheduler1", 0L, 60000L, task, handler);
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME);
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo")));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
@ -167,14 +167,14 @@ public class ScheduledJobRunnerTests extends ESTestCase {
scheduledJobRunner.run("scheduler1", 0L, null, task, handler);
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME);
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME);
if (cancelled) {
task.stop();
verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STOPPED)), any());
} else {
verify(client).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo")));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME), any());
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME), any());
}
}