Fix the PID used for autodetect process license validation (elastic/elasticsearch#380)

The PID used needs to be the PID of autodetect's parent proecss.  The parent
is of course the controller daemon rather than the JVM.

Original commit: elastic/x-pack-elasticsearch@607481e1e2
This commit is contained in:
David Roberts 2016-11-24 13:51:54 +00:00 committed by GitHub
parent b8856eea54
commit 3f0b13cda9
7 changed files with 73 additions and 31 deletions

View File

@ -22,8 +22,12 @@ import org.elasticsearch.common.xcontent.XContentType;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.time.Duration;
import java.util.Deque; import java.util.Deque;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** /**
* Handle a stream of C++ log messages that arrive via a named pipe in JSON format. * Handle a stream of C++ log messages that arrive via a named pipe in JSON format.
@ -40,8 +44,10 @@ public class CppLogMessageHandler implements Closeable {
private final int readBufSize; private final int readBufSize;
private final int errorStoreSize; private final int errorStoreSize;
private final Deque<String> errorStore; private final Deque<String> errorStore;
private final CountDownLatch pidLatch;
private volatile boolean hasLogStreamEnded; private volatile boolean hasLogStreamEnded;
private volatile boolean seenFatalError; private volatile boolean seenFatalError;
private volatile long pid;
/** /**
* @param jobId May be null or empty if the logs are from a process not associated with a job. * @param jobId May be null or empty if the logs are from a process not associated with a job.
@ -60,7 +66,8 @@ public class CppLogMessageHandler implements Closeable {
this.inputStream = Objects.requireNonNull(inputStream); this.inputStream = Objects.requireNonNull(inputStream);
this.readBufSize = readBufSize; this.readBufSize = readBufSize;
this.errorStoreSize = errorStoreSize; this.errorStoreSize = errorStoreSize;
this.errorStore = ConcurrentCollections.newDeque(); errorStore = ConcurrentCollections.newDeque();
pidLatch = new CountDownLatch(1);
hasLogStreamEnded = false; hasLogStreamEnded = false;
} }
@ -101,6 +108,29 @@ public class CppLogMessageHandler implements Closeable {
return seenFatalError; return seenFatalError;
} }
/**
* Get the process ID of the C++ process whose log messages are being read. This will
* arrive in the first log message logged by the C++ process. They all log their version
* number immediately on startup so it should not take long to arrive, but will not be
* available instantly after the process starts.
*/
public long getPid(Duration timeout) throws TimeoutException {
// There's an assumption here that 0 is not a valid PID. This is certainly true for
// userland processes. On Windows the "System Idle Process" has PID 0 and on *nix
// PID 0 is for "sched", which is part of the kernel.
if (pid == 0) {
try {
pidLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (pid == 0) {
throw new TimeoutException("Timed out waiting for C++ process PID");
}
}
return pid;
}
/** /**
* Expected to be called very infrequently. * Expected to be called very infrequently.
*/ */
@ -143,8 +173,13 @@ public class CppLogMessageHandler implements Closeable {
seenFatalError = true; seenFatalError = true;
} }
} }
long latestPid = msg.getPid();
if (pid != latestPid) {
pid = latestPid;
pidLatch.countDown();
}
// TODO: Is there a way to preserve the original timestamp when re-logging? // TODO: Is there a way to preserve the original timestamp when re-logging?
logger.log(level, "{}/{} {}@{} {}", msg.getLogger(), msg.getPid(), msg.getFile(), msg.getLine(), msg.getMessage()); logger.log(level, "{}/{} {}@{} {}", msg.getLogger(), latestPid, msg.getFile(), msg.getLine(), msg.getMessage());
// TODO: Could send the message for indexing instead of or as well as logging it // TODO: Could send the message for indexing instead of or as well as logging it
} catch (IOException e) { } catch (IOException e) {
logger.warn("Failed to parse C++ log message: " + bytesRef.utf8ToString(), e); logger.warn("Failed to parse C++ log message: " + bytesRef.utf8ToString(), e);

View File

@ -16,6 +16,7 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeoutException;
/** /**
@ -55,6 +56,10 @@ public class NativeController {
logTailThread.start(); logTailThread.start();
} }
public long getPid() throws TimeoutException {
return cppLogHandler.getPid(CONTROLLER_CONNECT_TIMEOUT);
}
public void startProcess(List<String> command) throws IOException { public void startProcess(List<String> command) throws IOException {
// Sanity check to avoid hard-to-debug errors - tabs and newlines will confuse the controller process // Sanity check to avoid hard-to-debug errors - tabs and newlines will confuse the controller process
for (String arg : command) { for (String arg : command) {

View File

@ -12,7 +12,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.xpack.prelert.PrelertPlugin; import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig; import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.DataDescription; import org.elasticsearch.xpack.prelert.job.DataDescription;
@ -153,14 +152,15 @@ public class ProcessCtrl {
return rng.nextInt(SECONDS_IN_HOUR); return rng.nextInt(SECONDS_IN_HOUR);
} }
public static List<String> buildAutodetectCommand(Environment env, Settings settings, Job job, Logger logger, boolean ignoreDowntime) { public static List<String> buildAutodetectCommand(Environment env, Settings settings, Job job, Logger logger, boolean ignoreDowntime,
long controllerPid) {
List<String> command = new ArrayList<>(); List<String> command = new ArrayList<>();
command.add(AUTODETECT_PATH); command.add(AUTODETECT_PATH);
String jobId = JOB_ID_ARG + job.getId(); String jobId = JOB_ID_ARG + job.getId();
command.add(jobId); command.add(jobId);
command.add(makeLicenseArg()); command.add(makeLicenseArg(controllerPid));
AnalysisConfig analysisConfig = job.getAnalysisConfig(); AnalysisConfig analysisConfig = job.getAnalysisConfig();
if (analysisConfig != null) { if (analysisConfig != null) {
@ -259,12 +259,12 @@ public class ProcessCtrl {
* Build the command to start the normalizer process. * Build the command to start the normalizer process.
*/ */
public static List<String> buildNormaliserCommand(Environment env, String jobId, String quantilesState, Integer bucketSpan, public static List<String> buildNormaliserCommand(Environment env, String jobId, String quantilesState, Integer bucketSpan,
boolean perPartitionNormalization, Logger logger) throws IOException { boolean perPartitionNormalization, long controllerPid) throws IOException {
List<String> command = new ArrayList<>(); List<String> command = new ArrayList<>();
command.add(NORMALIZE_PATH); command.add(NORMALIZE_PATH);
addIfNotNull(bucketSpan, BUCKET_SPAN_ARG, command); addIfNotNull(bucketSpan, BUCKET_SPAN_ARG, command);
command.add(makeLicenseArg()); command.add(makeLicenseArg(controllerPid));
command.add(LENGTH_ENCODED_INPUT_ARG); command.add(LENGTH_ENCODED_INPUT_ARG);
if (perPartitionNormalization) { if (perPartitionNormalization) {
command.add(PER_PARTITION_NORMALIZATION); command.add(PER_PARTITION_NORMALIZATION);
@ -305,12 +305,12 @@ public class ProcessCtrl {
} }
/** /**
* The number must be equal to the JVM PID modulo a magic number. * The number must be equal to the daemon controller's PID modulo a magic number.
*/ */
private static String makeLicenseArg() { private static String makeLicenseArg(long controllerPid) {
// Get a random int rather than long so we don't overflow when multiplying by VALIDATION_NUMBER // Get a random int rather than long so we don't overflow when multiplying by VALIDATION_NUMBER
long rand = Randomness.get().nextInt(); long rand = Randomness.get().nextInt();
long val = JvmInfo.jvmInfo().pid() + (((rand < 0) ? -rand : rand) + 1) * VALIDATION_NUMBER; long val = controllerPid + (((rand < 0) ? -rand : rand) + 1) * VALIDATION_NUMBER;
return LICENSE_VALIDATION_ARG + val; return LICENSE_VALIDATION_ARG + val;
} }
} }

View File

@ -31,6 +31,7 @@ import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeoutException;
/** /**
* The autodetect process builder. * The autodetect process builder.
@ -103,9 +104,9 @@ public class AutodetectBuilder {
/** /**
* Requests that the controller daemon start an autodetect process. * Requests that the controller daemon start an autodetect process.
*/ */
public void build() throws IOException { public void build() throws IOException, TimeoutException {
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job, logger, ignoreDowntime); List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job, logger, ignoreDowntime, controller.getPid());
buildLimits(command); buildLimits(command);
buildModelDebugConfig(command); buildModelDebugConfig(command);

View File

@ -35,6 +35,7 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
public class NativeAutodetectProcessFactory implements AutodetectProcessFactory { public class NativeAutodetectProcessFactory implements AutodetectProcessFactory {
@ -111,7 +112,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
autodetectBuilder.build(); autodetectBuilder.build();
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
} catch (IOException e) { } catch (IOException | TimeoutException e) {
String msg = "Failed to launch process for job " + job.getId(); String msg = "Failed to launch process for job " + job.getId();
LOGGER.error(msg); LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg, e); throw ExceptionsHelper.serverError(msg, e);

View File

@ -14,10 +14,12 @@ import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
public class CppLogMessageHandlerTests extends ESTestCase { public class CppLogMessageHandlerTests extends ESTestCase {
public void testParse() throws IOException { public void testParse() throws IOException, TimeoutException {
String testData = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211," String testData = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211,"
+ "\"thread\":\"0x7fff7d2a8000\",\"message\":\"uname -a : Darwin Davids-MacBook-Pro.local 15.6.0 Darwin Kernel " + "\"thread\":\"0x7fff7d2a8000\",\"message\":\"uname -a : Darwin Davids-MacBook-Pro.local 15.6.0 Darwin Kernel "
@ -44,7 +46,10 @@ public class CppLogMessageHandlerTests extends ESTestCase {
try (CppLogMessageHandler handler = new CppLogMessageHandler(is, logger, 100, 3)) { try (CppLogMessageHandler handler = new CppLogMessageHandler(is, logger, 100, 3)) {
handler.tailStream(); handler.tailStream();
assertTrue(handler.hasLogStreamEnded());
assertEquals(10211L, handler.getPid(Duration.ofMillis(1)));
assertEquals("Did not understand verb 'a'\n", handler.getErrors()); assertEquals("Did not understand verb 'a'\n", handler.getErrors());
assertFalse(handler.seenFatalError());
} }
} }
} }

View File

@ -8,15 +8,12 @@ package org.elasticsearch.xpack.prelert.job.process;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig; import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.DataDescription; import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Detector; import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.IgnoreDowntime; import org.elasticsearch.xpack.prelert.job.IgnoreDowntime;
import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.Job;
import org.junit.Before;
import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
@ -25,13 +22,11 @@ import java.util.List;
import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder; import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder;
public class ProcessCtrlTests extends ESTestCase { public class ProcessCtrlTests extends ESTestCase {
@Mock
private Logger logger;
@Before private final Logger logger = Mockito.mock(Logger.class);
public void setupMock() { // 4194304 is the maximum possible PID on Linux according to
logger = Mockito.mock(Logger.class); // http://web.archive.org/web/20111209081734/http://research.cs.wisc.edu/condor/condorg/linux_scalability.html
} private final long pid = randomIntBetween(2, 4194304);
public void testBuildAutodetectCommand() { public void testBuildAutodetectCommand() {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
@ -58,7 +53,7 @@ public class ProcessCtrlTests extends ESTestCase {
job.setIgnoreDowntime(IgnoreDowntime.ONCE); job.setIgnoreDowntime(IgnoreDowntime.ONCE);
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false); List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid);
assertEquals(17, command.size()); assertEquals(17, command.size());
assertTrue(command.contains(ProcessCtrl.AUTODETECT_PATH)); assertTrue(command.contains(ProcessCtrl.AUTODETECT_PATH));
assertTrue(command.contains(ProcessCtrl.BATCH_SPAN_ARG + "100")); assertTrue(command.contains(ProcessCtrl.BATCH_SPAN_ARG + "100"));
@ -90,7 +85,7 @@ public class ProcessCtrlTests extends ESTestCase {
Environment env = new Environment(settings); Environment env = new Environment(settings);
Job.Builder job = buildJobBuilder("unit-test-job"); Job.Builder job = buildJobBuilder("unit-test-job");
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false); List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid);
assertTrue(command.contains(ProcessCtrl.TIME_FIELD_ARG + "time")); assertTrue(command.contains(ProcessCtrl.TIME_FIELD_ARG + "time"));
} }
@ -103,13 +98,13 @@ public class ProcessCtrlTests extends ESTestCase {
int expectedPersistInterval = 10800 + ProcessCtrl.calculateStaggeringInterval(job.getId()); int expectedPersistInterval = 10800 + ProcessCtrl.calculateStaggeringInterval(job.getId());
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false); List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid);
assertFalse(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval)); assertFalse(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval));
settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
env = new Environment(settings); env = new Environment(settings);
command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false); command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid);
assertTrue(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval)); assertTrue(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval));
} }
@ -119,7 +114,7 @@ public class ProcessCtrlTests extends ESTestCase {
settings); settings);
Job.Builder job = buildJobBuilder("foo"); Job.Builder job = buildJobBuilder("foo");
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false); List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid);
assertFalse(command.contains("--ignoreDowntime")); assertFalse(command.contains("--ignoreDowntime"));
} }
@ -130,7 +125,7 @@ public class ProcessCtrlTests extends ESTestCase {
settings); settings);
Job.Builder job = buildJobBuilder("foo"); Job.Builder job = buildJobBuilder("foo");
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, true); List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, true, pid);
assertTrue(command.contains("--ignoreDowntime")); assertTrue(command.contains("--ignoreDowntime"));
} }
@ -140,7 +135,7 @@ public class ProcessCtrlTests extends ESTestCase {
Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build()); Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build());
String jobId = "unit-test-job"; String jobId = "unit-test-job";
List<String> command = ProcessCtrl.buildNormaliserCommand(env, jobId, null, 300, true, logger); List<String> command = ProcessCtrl.buildNormaliserCommand(env, jobId, null, 300, true, pid);
assertEquals(5, command.size()); assertEquals(5, command.size());
assertTrue(command.contains(ProcessCtrl.NORMALIZE_PATH)); assertTrue(command.contains(ProcessCtrl.NORMALIZE_PATH));
assertTrue(command.contains(ProcessCtrl.BUCKET_SPAN_ARG + "300")); assertTrue(command.contains(ProcessCtrl.BUCKET_SPAN_ARG + "300"));
@ -159,7 +154,7 @@ public class ProcessCtrlTests extends ESTestCase {
return false; return false;
} }
long val = Long.parseLong(argAndVal[1]); long val = Long.parseLong(argAndVal[1]);
if ((val % ProcessCtrl.VALIDATION_NUMBER) != (JvmInfo.jvmInfo().pid() % ProcessCtrl.VALIDATION_NUMBER)) { if ((val % ProcessCtrl.VALIDATION_NUMBER) != (pid % ProcessCtrl.VALIDATION_NUMBER)) {
return false; return false;
} }
} }