Merge branch 'master' into index-lifecycle
This commit is contained in:
commit
c5d31e30dd
|
@ -22,8 +22,6 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -84,20 +82,7 @@ public class NativeController {
|
|||
}
|
||||
|
||||
public Map<String, Object> getNativeCodeInfo() throws TimeoutException {
|
||||
String copyrightMessage = cppLogHandler.getCppCopyright(CONTROLLER_CONNECT_TIMEOUT);
|
||||
Matcher matcher = Pattern.compile("Version (.+) \\(Build ([^)]+)\\) Copyright ").matcher(copyrightMessage);
|
||||
if (matcher.find()) {
|
||||
Map<String, Object> info = new HashMap<>(2);
|
||||
info.put("version", matcher.group(1));
|
||||
info.put("build_hash", matcher.group(2));
|
||||
return info;
|
||||
} else {
|
||||
// If this happens it probably means someone has changed the format in lib/ver/CBuildInfo.cc
|
||||
// in the machine-learning-cpp repo without changing the pattern above to match
|
||||
String msg = "Unexpected native controller process copyright format: " + copyrightMessage;
|
||||
LOGGER.error(msg);
|
||||
throw new ElasticsearchException(msg);
|
||||
}
|
||||
return cppLogHandler.getNativeCodeInfo(CONTROLLER_CONNECT_TIMEOUT);
|
||||
}
|
||||
|
||||
public void startProcess(List<String> command) throws IOException {
|
||||
|
|
|
@ -8,7 +8,7 @@ package org.elasticsearch.xpack.ml.job.process.logging;
|
|||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.common.ParsingException;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -30,10 +30,15 @@ import java.time.Duration;
|
|||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Handle a stream of C++ log messages that arrive via a named pipe in JSON format.
|
||||
|
@ -181,6 +186,26 @@ public class CppLogMessageHandler implements Closeable {
|
|||
return cppCopyright;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts version information from the copyright string which assumes a certain format.
|
||||
*/
|
||||
public Map<String, Object> getNativeCodeInfo(Duration timeout) throws TimeoutException {
|
||||
String copyrightMessage = getCppCopyright(timeout);
|
||||
Matcher matcher = Pattern.compile("Version (.+) \\(Build ([^)]+)\\) Copyright ").matcher(copyrightMessage);
|
||||
if (matcher.find()) {
|
||||
Map<String, Object> info = new HashMap<>(2);
|
||||
info.put("version", matcher.group(1));
|
||||
info.put("build_hash", matcher.group(2));
|
||||
return info;
|
||||
} else {
|
||||
// If this happens it probably means someone has changed the format in lib/ver/CBuildInfo.cc
|
||||
// in the ml-cpp repo without changing the pattern above to match
|
||||
String msg = "Unexpected native process copyright format: " + copyrightMessage;
|
||||
LOGGER.error(msg);
|
||||
throw new ElasticsearchException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Expected to be called very infrequently.
|
||||
*/
|
||||
|
@ -281,8 +306,18 @@ public class CppLogMessageHandler implements Closeable {
|
|||
} catch (XContentParseException e) {
|
||||
String upstreamMessage = "Fatal error: '" + bytesRef.utf8ToString() + "'";
|
||||
if (upstreamMessage.contains("bad_alloc")) {
|
||||
upstreamMessage += ", process ran out of memory.";
|
||||
upstreamMessage += ", process ran out of memory";
|
||||
}
|
||||
|
||||
// add version information, so it's conveniently next to the crash log
|
||||
upstreamMessage += ", version: ";
|
||||
try {
|
||||
Map<String, Object> versionInfo = getNativeCodeInfo(Duration.ofMillis(10));
|
||||
upstreamMessage += String.format(Locale.ROOT, "%s (build %s)", versionInfo.get("version"), versionInfo.get("build_hash"));
|
||||
} catch (TimeoutException timeoutException) {
|
||||
upstreamMessage += "failed to retrieve";
|
||||
}
|
||||
|
||||
storeError(upstreamMessage);
|
||||
seenFatalError = true;
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.watcher.test;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
|
||||
|
@ -70,10 +71,12 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
|
||||
|
@ -177,7 +180,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
|||
public void _setup() throws Exception {
|
||||
if (timeWarped()) {
|
||||
timeWarp = new TimeWarp(internalCluster().getInstances(ScheduleTriggerEngineMock.class),
|
||||
(ClockMock)getInstanceFromMaster(Clock.class));
|
||||
(ClockMock)getInstanceFromMaster(Clock.class), logger);
|
||||
}
|
||||
|
||||
if (internalCluster().size() > 0) {
|
||||
|
@ -536,24 +539,28 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
|||
|
||||
protected static class TimeWarp {
|
||||
|
||||
protected final Iterable<ScheduleTriggerEngineMock> schedulers;
|
||||
protected final ClockMock clock;
|
||||
private final List<ScheduleTriggerEngineMock> schedulers;
|
||||
private final ClockMock clock;
|
||||
private final Logger logger;
|
||||
|
||||
public TimeWarp(Iterable<ScheduleTriggerEngineMock> schedulers, ClockMock clock) {
|
||||
this.schedulers = schedulers;
|
||||
TimeWarp(Iterable<ScheduleTriggerEngineMock> schedulers, ClockMock clock, Logger logger) {
|
||||
this.schedulers = StreamSupport.stream(schedulers.spliterator(), false).collect(Collectors.toList());
|
||||
this.clock = clock;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
public void trigger(String jobName) {
|
||||
schedulers.forEach(scheduler -> scheduler.trigger(jobName));
|
||||
trigger(jobName, 1, null);
|
||||
}
|
||||
|
||||
public ClockMock clock() {
|
||||
return clock;
|
||||
}
|
||||
|
||||
public void trigger(String id, int times, TimeValue timeValue) {
|
||||
schedulers.forEach(scheduler -> scheduler.trigger(id, times, timeValue));
|
||||
public void trigger(String watchId, int times, TimeValue timeValue) {
|
||||
boolean isTriggered = schedulers.stream().anyMatch(scheduler -> scheduler.trigger(watchId, times, timeValue));
|
||||
String msg = String.format(Locale.ROOT, "could not find watch [%s] to trigger", watchId);
|
||||
assertThat(msg, isTriggered, is(true));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -77,18 +77,13 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
|
|||
return watches.remove(jobId) != null;
|
||||
}
|
||||
|
||||
public void trigger(String jobName) {
|
||||
trigger(jobName, 1, null);
|
||||
public boolean trigger(String jobName) {
|
||||
return trigger(jobName, 1, null);
|
||||
}
|
||||
|
||||
public void trigger(String jobName, int times) {
|
||||
trigger(jobName, times, null);
|
||||
}
|
||||
|
||||
public void trigger(String jobName, int times, TimeValue interval) {
|
||||
public boolean trigger(String jobName, int times, TimeValue interval) {
|
||||
if (watches.containsKey(jobName) == false) {
|
||||
logger.trace("not executing job [{}], not found", jobName);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
for (int i = 0; i < times; i++) {
|
||||
|
@ -108,5 +103,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue