[ML] Kill autodetect on error writing headers (elastic/x-pack-elasticsearch#1730)

If an exception occurs while sending the initial setup messages to the autodetect
such that it fails rather than reaching the open state then the autodetect process
needs to be killed to prevent it hogging resources.

Relates elastic/x-pack-elasticsearch#1684

Original commit: elastic/x-pack-elasticsearch@1ee80ed9b0
This commit is contained in:
David Roberts 2017-06-15 14:03:41 +01:00 committed by GitHub
parent b748da1880
commit 291aa27562
2 changed files with 13 additions and 19 deletions

View File

@ -58,7 +58,6 @@ import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
@ -128,8 +127,8 @@ public class AutodetectProcessManager extends AbstractComponent {
if (numJobs != 0) {
logger.info("Closing [{}] jobs, because [{}]", numJobs, reason);
for (Map.Entry<Long, AutodetectCommunicator> entry : autoDetectCommunicatorByJob.entrySet()) {
closeJob(entry.getValue().getJobTask(), false, reason);
for (AutodetectCommunicator communicator : autoDetectCommunicatorByJob.values()) {
closeJob(communicator.getJobTask(), false, reason);
}
}
}
@ -262,15 +261,18 @@ public class AutodetectProcessManager extends AbstractComponent {
communicator.writeJobInputHeader();
setJobState(jobTask, JobState.OPENED);
} catch (Exception e1) {
if (e1 instanceof ElasticsearchStatusException) {
logger.info(e1.getMessage());
} else {
String msg = String.format(Locale.ROOT, "[%s] exception while opening job", jobId);
logger.error(msg, e1);
// No need to log here as the persistent task framework will log it
try {
// Don't leave a partially initialised process hanging around
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.remove(jobTask.getAllocationId());
if (communicator != null) {
communicator.killProcess(false, false);
}
} finally {
setJobState(jobTask, JobState.FAILED, e2 -> handler.accept(e1));
}
}
}
});
}, e1 -> {
logger.warn("Failed to gather information required to open job [" + jobId + "]", e1);

View File

@ -11,18 +11,15 @@ import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -107,9 +104,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
int numFields = outFieldIndexes.size();
String[] record = new String[numFields];
Iterator<Map.Entry<String, Integer>> itr = outFieldIndexes.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, Integer> entry = itr.next();
for (Map.Entry<String, Integer> entry : outFieldIndexes.entrySet()) {
record[entry.getValue()] = entry.getKey();
}
@ -210,15 +205,12 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
* Time is the first field and the last is the control field
*/
protected final Map<String, Integer> outputFieldIndexes() {
Map<String, Integer> fieldIndexes = new HashMap<String, Integer>();
Map<String, Integer> fieldIndexes = new HashMap<>();
// time field
fieldIndexes.put(dataDescription.getTimeField(), TIME_FIELD_OUT_INDEX);
int index = TIME_FIELD_OUT_INDEX + 1;
List<String> analysisFields = analysisConfig.analysisFields();
Collections.sort(analysisFields);
for (String field : analysisConfig.analysisFields()) {
fieldIndexes.put(field, index++);
}