[ML] Improve reason why datafeed is stopped.

Original commit: elastic/x-pack-elasticsearch@2055a64689
This commit is contained in:
Martijn van Groningen 2017-03-02 18:03:42 +01:00
parent 0542a9eb92
commit e9e14cbcea
4 changed files with 15 additions and 12 deletions

View File

@ -262,18 +262,18 @@ public class StartDatafeedAction
@Override @Override
protected void onCancelled() { protected void onCancelled() {
stop(); stop(getReasonCancelled());
} }
public void stop() { public void stop(String reason) {
stop(TimeValue.timeValueSeconds(20)); stop(reason, TimeValue.timeValueSeconds(20));
} }
public void stop(TimeValue timeout) { public void stop(String reason, TimeValue timeout) {
if (holder == null) { if (holder == null) {
throw new IllegalStateException("task cancel ran before datafeed runner assigned the holder"); throw new IllegalStateException("task cancel ran before datafeed runner assigned the holder");
} }
holder.stop("cancel", timeout, null); holder.stop(reason, timeout, null);
} }
} }

View File

@ -252,7 +252,7 @@ public class StopDatafeedAction
@Override @Override
protected void taskOperation(Request request, StartDatafeedAction.DatafeedTask task, ActionListener<Response> listener) { protected void taskOperation(Request request, StartDatafeedAction.DatafeedTask task, ActionListener<Response> listener) {
task.stop(request.getTimeout()); task.stop("stop_datafeed_api", request.getTimeout());
listener.onResponse(new Response(true)); listener.onResponse(new Response(true));
} }

View File

@ -271,18 +271,21 @@ public class DatafeedJobRunner extends AbstractComponent {
if (datafeedJob.stop()) { if (datafeedJob.stop()) {
boolean acquired = false; boolean acquired = false;
try { try {
logger.info("[{}] try lock [{}] to stop datafeed [{}] for job [{}]...", source, timeout, datafeed.getId(),
datafeed.getJobId());
acquired = datafeedJobLock.tryLock(timeout.millis(), TimeUnit.MILLISECONDS); acquired = datafeedJobLock.tryLock(timeout.millis(), TimeUnit.MILLISECONDS);
logger.info("[{}] stopping datafeed [{}] for job [{}]...", source, datafeed.getId(), datafeed.getJobId()); } catch (InterruptedException e1) {
Thread.currentThread().interrupt();
} finally {
logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeed.getId(),
datafeed.getJobId(), acquired);
FutureUtils.cancel(future); FutureUtils.cancel(future);
auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
handler.accept(e);
logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId()); logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId());
if (autoCloseJob) { if (autoCloseJob) {
closeJob(); closeJob();
} }
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
} finally {
handler.accept(e);
if (acquired) { if (acquired) {
datafeedJobLock.unlock(); datafeedJobLock.unlock();
} }

View File

@ -273,7 +273,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME);
if (cancelled) { if (cancelled) {
task.stop(); task.stop("test");
verify(handler).accept(null); verify(handler).accept(null);
} else { } else {
verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest("job_id"))); verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest("job_id")));