[ML] Avoid spurious logging when deleting lookback job from the UI (elastic/x-pack-elasticsearch#3193)

When you click "delete" in the UI it force-deletes the datafeed then
force-deletes the job.  For a datafeed doing lookback, this results
in a close followed very quickly by a kill on the autodetect process.
Depending on thread scheduling this could cause a lot of spurious
errors and exception traces to be logged.

This change prevents the log spam in this scenario.

relates elastic/x-pack-elasticsearch#3149

Original commit: elastic/x-pack-elasticsearch@091240f32a
This commit is contained in:
David Roberts 2017-12-04 10:29:05 +00:00 committed by GitHub
parent df9dd77656
commit 2c978842da
5 changed files with 89 additions and 12 deletions

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -17,6 +18,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
@ -429,7 +431,15 @@ public class DatafeedManager extends AbstractComponent {
@Override
public void onFailure(Exception e) {
logger.error("[" + getJobId() + "] failed to auto-close job", e);
// Given that the UI force-deletes the datafeed and then force-deletes the job, it's
// quite likely that the auto-close here will get interrupted by a process kill request,
// and it's misleading/worrying to log an error in this case.
if (e instanceof ElasticsearchStatusException &&
((ElasticsearchStatusException) e).status() == RestStatus.CONFLICT) {
logger.debug("[{}] {}", getJobId(), e.getMessage());
} else {
logger.error("[" + getJobId() + "] failed to auto-close job", e);
}
}
});
}

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
@ -32,6 +31,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.Closeable;
import java.io.IOException;
@ -154,7 +154,12 @@ public class AutodetectCommunicator implements Closeable {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw ExceptionsHelper.convertToElastic(e);
if (processKilled) {
// In this case the original exception is spurious and highly misleading
throw ExceptionsHelper.conflictStatusException("Close job interrupted by kill request");
} else {
throw new ElasticsearchException(e);
}
}
}
@ -242,19 +247,15 @@ public class AutodetectCommunicator implements Closeable {
*/
private void checkProcessIsAlive() {
if (!autodetectProcess.isProcessAlive()) {
ParameterizedMessage message =
new ParameterizedMessage("[{}] Unexpected death of autodetect: {}", job.getId(), autodetectProcess.readError());
LOGGER.error(message);
throw new ElasticsearchException(message.getFormattedMessage());
// Don't log here - it just causes double logging when the exception gets logged
throw new ElasticsearchException("[{}] Unexpected death of autodetect: {}", job.getId(), autodetectProcess.readError());
}
}
private void checkResultsProcessorIsAlive() {
if (autoDetectResultProcessor.isFailed()) {
ParameterizedMessage message =
new ParameterizedMessage("[{}] Unexpected death of the result processor", job.getId());
LOGGER.error(message);
throw new ElasticsearchException(message.getFormattedMessage());
// Don't log here - it just causes double logging when the exception gets logged
throw new ElasticsearchException("[{}] Unexpected death of the result processor", job.getId());
}
}

View File

@ -473,6 +473,10 @@ public class AutodetectProcessManager extends AbstractComponent {
communicator.close(restart, reason);
processByAllocation.remove(allocationId);
} catch (Exception e) {
// If the close failed because the process has explicitly been killed by us then just pass on that exception
if (e instanceof ElasticsearchStatusException && ((ElasticsearchStatusException) e).status() == RestStatus.CONFLICT) {
throw e;
}
logger.warn("[" + jobId + "] Exception closing autodetect process", e);
setJobState(jobTask, JobState.FAILED);
throw ExceptionsHelper.serverError("Exception closing autodetect process", e);

View File

@ -9,6 +9,7 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
@ -17,6 +18,7 @@ import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.KillProcessAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
@ -32,10 +34,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
@ -223,6 +227,59 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
});
}
/**
* Stopping a lookback closes the associated job _after_ the stop call returns.
* This test ensures that a kill request submitted during this close doesn't
* put the job into the "failed" state.
*/
public void testStopLookbackFollowedByProcessKill() throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
.get();
long numDocs = randomIntBetween(1024, 2048);
long now = System.currentTimeMillis();
long oneWeekAgo = now - 604800000;
long twoWeeksAgo = oneWeekAgo - 604800000;
indexDocs(logger, "data", numDocs, twoWeeksAgo, oneWeekAgo);
Job.Builder job = createScheduledJob("lookback-job-stopped-then-killed");
registerJob(job);
PutJobAction.Response putJobResponse = putJob(job);
assertTrue(putJobResponse.isAcknowledged());
assertThat(putJobResponse.getResponse().getJobVersion(), equalTo(Version.CURRENT));
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
List<String> t = Collections.singletonList("data");
DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), t);
// Use lots of chunks so we have time to stop the lookback before it completes
datafeedConfigBuilder.setChunkingConfig(ChunkingConfig.newManual(new TimeValue(1, TimeUnit.SECONDS)));
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
registerDatafeed(datafeedConfig);
assertTrue(putDatafeed(datafeedConfig).isAcknowledged());
startDatafeed(datafeedConfig.getId(), 0L, now);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), greaterThan(0L));
}, 60, TimeUnit.SECONDS);
stopDatafeed(datafeedConfig.getId());
// At this point, stopping the datafeed will have submitted a request for the job to close.
// Depending on thread scheduling, the following kill request might overtake it. The Thread.sleep()
// call here makes it more likely; to make it inevitable for testing also add a Thread.sleep(10)
// immediately before the checkProcessIsAlive() call in AutodetectCommunicator.close().
Thread.sleep(randomIntBetween(1, 9));
KillProcessAction.Request killRequest = new KillProcessAction.Request(job.getId());
client().execute(KillProcessAction.INSTANCE, killRequest).actionGet();
// This should close very quickly, as we killed the process. If the job goes into the "failed"
// state that's wrong and this test will fail.
waitUntilJobIsClosed(job.getId(), TimeValue.timeValueSeconds(2));
}
private void startRealtime(String jobId) throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")

View File

@ -233,7 +233,12 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase {
}
protected void waitUntilJobIsClosed(String jobId) throws Exception {
assertBusy(() -> assertThat(getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED)), 30, TimeUnit.SECONDS);
waitUntilJobIsClosed(jobId, TimeValue.timeValueSeconds(30));
}
protected void waitUntilJobIsClosed(String jobId, TimeValue waitTime) throws Exception {
assertBusy(() -> assertThat(getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED)),
waitTime.getMillis(), TimeUnit.MILLISECONDS);
}
protected List<Job> getJob(String jobId) {