diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index c93c5d3dfd9..01e5a7d3425 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.datafeed; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -17,6 +18,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; @@ -429,7 +431,15 @@ public class DatafeedManager extends AbstractComponent { @Override public void onFailure(Exception e) { - logger.error("[" + getJobId() + "] failed to auto-close job", e); + // Given that the UI force-deletes the datafeed and then force-deletes the job, it's + // quite likely that the auto-close here will get interrupted by a process kill request, + // and it's misleading/worrying to log an error in this case. + if (e instanceof ElasticsearchStatusException && + ((ElasticsearchStatusException) e).status() == RestStatus.CONFLICT) { + logger.debug("[{}] {}", getJobId(), e.getMessage()); + } else { + logger.error("[" + getJobId() + "] failed to auto-close job", e); + } } }); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 04624ab05da..dc3f303ecea 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; @@ -32,6 +31,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory; +import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.Closeable; import java.io.IOException; @@ -154,7 +154,12 @@ public class AutodetectCommunicator implements Closeable { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { - throw ExceptionsHelper.convertToElastic(e); + if (processKilled) { + // In this case the original exception is spurious and highly misleading + throw ExceptionsHelper.conflictStatusException("Close job interrupted by kill request"); + } else { + throw new ElasticsearchException(e); + } } } @@ -242,19 +247,15 @@ public class AutodetectCommunicator implements Closeable { */ private void checkProcessIsAlive() { if (!autodetectProcess.isProcessAlive()) { - ParameterizedMessage message = - new ParameterizedMessage("[{}] Unexpected death of autodetect: {}", job.getId(), autodetectProcess.readError()); - LOGGER.error(message); - throw new ElasticsearchException(message.getFormattedMessage()); + // Don't log here - it just causes double logging when the exception gets logged + throw new ElasticsearchException("[{}] Unexpected death of autodetect: {}", job.getId(), autodetectProcess.readError()); } } private void checkResultsProcessorIsAlive() { if (autoDetectResultProcessor.isFailed()) { - ParameterizedMessage message = - new ParameterizedMessage("[{}] Unexpected death of the result processor", job.getId()); - LOGGER.error(message); - throw new ElasticsearchException(message.getFormattedMessage()); + // Don't log here - it just causes double logging when the exception gets logged + throw new ElasticsearchException("[{}] Unexpected death of the result processor", job.getId()); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 53e86bda359..40a343595a5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -473,6 +473,10 @@ public class AutodetectProcessManager extends AbstractComponent { communicator.close(restart, reason); processByAllocation.remove(allocationId); } catch (Exception e) { + // If the close failed because the process has explicitly been killed by us then just pass on that exception + if (e instanceof ElasticsearchStatusException && ((ElasticsearchStatusException) e).status() == RestStatus.CONFLICT) { + throw e; + } logger.warn("[" + jobId + "] Exception closing autodetect process", e); setJobState(jobTask, JobState.FAILED); throw ExceptionsHelper.serverError("Exception closing autodetect process", e); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java index d8460764eac..ce6d7ab66b6 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -9,6 +9,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; @@ -17,6 +18,7 @@ import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.action.KillProcessAction; import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.StopDatafeedAction; +import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; @@ -32,10 +34,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { @@ -223,6 +227,59 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { }); } + /** + * Stopping a lookback closes the associated job _after_ the stop call returns. + * This test ensures that a kill request submitted during this close doesn't + * put the job into the "failed" state. + */ + public void testStopLookbackFollowedByProcessKill() throws Exception { + client().admin().indices().prepareCreate("data") + .addMapping("type", "time", "type=date") + .get(); + long numDocs = randomIntBetween(1024, 2048); + long now = System.currentTimeMillis(); + long oneWeekAgo = now - 604800000; + long twoWeeksAgo = oneWeekAgo - 604800000; + indexDocs(logger, "data", numDocs, twoWeeksAgo, oneWeekAgo); + + Job.Builder job = createScheduledJob("lookback-job-stopped-then-killed"); + registerJob(job); + PutJobAction.Response putJobResponse = putJob(job); + assertTrue(putJobResponse.isAcknowledged()); + assertThat(putJobResponse.getResponse().getJobVersion(), equalTo(Version.CURRENT)); + openJob(job.getId()); + assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED)); + + List t = Collections.singletonList("data"); + DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), t); + // Use lots of chunks so we have time to stop the lookback before it completes + datafeedConfigBuilder.setChunkingConfig(ChunkingConfig.newManual(new TimeValue(1, TimeUnit.SECONDS))); + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); + registerDatafeed(datafeedConfig); + assertTrue(putDatafeed(datafeedConfig).isAcknowledged()); + + startDatafeed(datafeedConfig.getId(), 0L, now); + assertBusy(() -> { + DataCounts dataCounts = getDataCounts(job.getId()); + assertThat(dataCounts.getProcessedRecordCount(), greaterThan(0L)); + }, 60, TimeUnit.SECONDS); + + stopDatafeed(datafeedConfig.getId()); + + // At this point, stopping the datafeed will have submitted a request for the job to close. + // Depending on thread scheduling, the following kill request might overtake it. The Thread.sleep() + // call here makes it more likely; to make it inevitable for testing also add a Thread.sleep(10) + // immediately before the checkProcessIsAlive() call in AutodetectCommunicator.close(). + Thread.sleep(randomIntBetween(1, 9)); + + KillProcessAction.Request killRequest = new KillProcessAction.Request(job.getId()); + client().execute(KillProcessAction.INSTANCE, killRequest).actionGet(); + + // This should close very quickly, as we killed the process. If the job goes into the "failed" + // state that's wrong and this test will fail. + waitUntilJobIsClosed(job.getId(), TimeValue.timeValueSeconds(2)); + } + private void startRealtime(String jobId) throws Exception { client().admin().indices().prepareCreate("data") .addMapping("type", "time", "type=date") diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index 4d2f644c7c5..c5606138d80 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -233,7 +233,12 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase { } protected void waitUntilJobIsClosed(String jobId) throws Exception { - assertBusy(() -> assertThat(getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED)), 30, TimeUnit.SECONDS); + waitUntilJobIsClosed(jobId, TimeValue.timeValueSeconds(30)); + } + + protected void waitUntilJobIsClosed(String jobId, TimeValue waitTime) throws Exception { + assertBusy(() -> assertThat(getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED)), + waitTime.getMillis(), TimeUnit.MILLISECONDS); } protected List getJob(String jobId) {