From 3f0b13cda944c7368fa978ba35a7c34336a21569 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Thu, 24 Nov 2016 13:51:54 +0000 Subject: [PATCH] Fix the PID used for autodetect process license validation (elastic/elasticsearch#380) The PID used needs to be the PID of autodetect's parent proecss. The parent is of course the controller daemon rather than the JVM. Original commit: elastic/x-pack-elasticsearch@607481e1e2359a295c109d3bed7989d3ab717a02 --- .../job/logging/CppLogMessageHandler.java | 39 ++++++++++++++++++- .../prelert/job/process/NativeController.java | 5 +++ .../prelert/job/process/ProcessCtrl.java | 16 ++++---- .../process/autodetect/AutodetectBuilder.java | 5 ++- .../NativeAutodetectProcessFactory.java | 3 +- .../logging/CppLogMessageHandlerTests.java | 7 +++- .../prelert/job/process/ProcessCtrlTests.java | 29 ++++++-------- 7 files changed, 73 insertions(+), 31 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandler.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandler.java index 50644681fa4..6ddc9d0454a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandler.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandler.java @@ -22,8 +22,12 @@ import org.elasticsearch.common.xcontent.XContentType; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.time.Duration; import java.util.Deque; import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Handle a stream of C++ log messages that arrive via a named pipe in JSON format. @@ -40,8 +44,10 @@ public class CppLogMessageHandler implements Closeable { private final int readBufSize; private final int errorStoreSize; private final Deque errorStore; + private final CountDownLatch pidLatch; private volatile boolean hasLogStreamEnded; private volatile boolean seenFatalError; + private volatile long pid; /** * @param jobId May be null or empty if the logs are from a process not associated with a job. @@ -60,7 +66,8 @@ public class CppLogMessageHandler implements Closeable { this.inputStream = Objects.requireNonNull(inputStream); this.readBufSize = readBufSize; this.errorStoreSize = errorStoreSize; - this.errorStore = ConcurrentCollections.newDeque(); + errorStore = ConcurrentCollections.newDeque(); + pidLatch = new CountDownLatch(1); hasLogStreamEnded = false; } @@ -101,6 +108,29 @@ public class CppLogMessageHandler implements Closeable { return seenFatalError; } + /** + * Get the process ID of the C++ process whose log messages are being read. This will + * arrive in the first log message logged by the C++ process. They all log their version + * number immediately on startup so it should not take long to arrive, but will not be + * available instantly after the process starts. + */ + public long getPid(Duration timeout) throws TimeoutException { + // There's an assumption here that 0 is not a valid PID. This is certainly true for + // userland processes. On Windows the "System Idle Process" has PID 0 and on *nix + // PID 0 is for "sched", which is part of the kernel. + if (pid == 0) { + try { + pidLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (pid == 0) { + throw new TimeoutException("Timed out waiting for C++ process PID"); + } + } + return pid; + } + /** * Expected to be called very infrequently. */ @@ -143,8 +173,13 @@ public class CppLogMessageHandler implements Closeable { seenFatalError = true; } } + long latestPid = msg.getPid(); + if (pid != latestPid) { + pid = latestPid; + pidLatch.countDown(); + } // TODO: Is there a way to preserve the original timestamp when re-logging? - logger.log(level, "{}/{} {}@{} {}", msg.getLogger(), msg.getPid(), msg.getFile(), msg.getLine(), msg.getMessage()); + logger.log(level, "{}/{} {}@{} {}", msg.getLogger(), latestPid, msg.getFile(), msg.getLine(), msg.getMessage()); // TODO: Could send the message for indexing instead of or as well as logging it } catch (IOException e) { logger.warn("Failed to parse C++ log message: " + bytesRef.utf8ToString(), e); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/NativeController.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/NativeController.java index a1dee37ec86..753c7c4ab75 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/NativeController.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/NativeController.java @@ -16,6 +16,7 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; +import java.util.concurrent.TimeoutException; /** @@ -55,6 +56,10 @@ public class NativeController { logTailThread.start(); } + public long getPid() throws TimeoutException { + return cppLogHandler.getPid(CONTROLLER_CONNECT_TIMEOUT); + } + public void startProcess(List command) throws IOException { // Sanity check to avoid hard-to-debug errors - tabs and newlines will confuse the controller process for (String arg : command) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/ProcessCtrl.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/ProcessCtrl.java index a8428209c71..8902890ed89 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/ProcessCtrl.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/ProcessCtrl.java @@ -12,7 +12,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; -import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.xpack.prelert.PrelertPlugin; import org.elasticsearch.xpack.prelert.job.AnalysisConfig; import org.elasticsearch.xpack.prelert.job.DataDescription; @@ -153,14 +152,15 @@ public class ProcessCtrl { return rng.nextInt(SECONDS_IN_HOUR); } - public static List buildAutodetectCommand(Environment env, Settings settings, Job job, Logger logger, boolean ignoreDowntime) { + public static List buildAutodetectCommand(Environment env, Settings settings, Job job, Logger logger, boolean ignoreDowntime, + long controllerPid) { List command = new ArrayList<>(); command.add(AUTODETECT_PATH); String jobId = JOB_ID_ARG + job.getId(); command.add(jobId); - command.add(makeLicenseArg()); + command.add(makeLicenseArg(controllerPid)); AnalysisConfig analysisConfig = job.getAnalysisConfig(); if (analysisConfig != null) { @@ -259,12 +259,12 @@ public class ProcessCtrl { * Build the command to start the normalizer process. */ public static List buildNormaliserCommand(Environment env, String jobId, String quantilesState, Integer bucketSpan, - boolean perPartitionNormalization, Logger logger) throws IOException { + boolean perPartitionNormalization, long controllerPid) throws IOException { List command = new ArrayList<>(); command.add(NORMALIZE_PATH); addIfNotNull(bucketSpan, BUCKET_SPAN_ARG, command); - command.add(makeLicenseArg()); + command.add(makeLicenseArg(controllerPid)); command.add(LENGTH_ENCODED_INPUT_ARG); if (perPartitionNormalization) { command.add(PER_PARTITION_NORMALIZATION); @@ -305,12 +305,12 @@ public class ProcessCtrl { } /** - * The number must be equal to the JVM PID modulo a magic number. + * The number must be equal to the daemon controller's PID modulo a magic number. */ - private static String makeLicenseArg() { + private static String makeLicenseArg(long controllerPid) { // Get a random int rather than long so we don't overflow when multiplying by VALIDATION_NUMBER long rand = Randomness.get().nextInt(); - long val = JvmInfo.jvmInfo().pid() + (((rand < 0) ? -rand : rand) + 1) * VALIDATION_NUMBER; + long val = controllerPid + (((rand < 0) ? -rand : rand) + 1) * VALIDATION_NUMBER; return LICENSE_VALIDATION_ARG + val; } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectBuilder.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectBuilder.java index 131d73ea441..26c4922cb59 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectBuilder.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectBuilder.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeoutException; /** * The autodetect process builder. @@ -103,9 +104,9 @@ public class AutodetectBuilder { /** * Requests that the controller daemon start an autodetect process. */ - public void build() throws IOException { + public void build() throws IOException, TimeoutException { - List command = ProcessCtrl.buildAutodetectCommand(env, settings, job, logger, ignoreDowntime); + List command = ProcessCtrl.buildAutodetectCommand(env, settings, job, logger, ignoreDowntime, controller.getPid()); buildLimits(command); buildModelDebugConfig(command); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/NativeAutodetectProcessFactory.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/NativeAutodetectProcessFactory.java index 186fe28b4cc..5bb1aefc48e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -35,6 +35,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; public class NativeAutodetectProcessFactory implements AutodetectProcessFactory { @@ -111,7 +112,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory autodetectBuilder.build(); processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); - } catch (IOException e) { + } catch (IOException | TimeoutException e) { String msg = "Failed to launch process for job " + job.getId(); LOGGER.error(msg); throw ExceptionsHelper.serverError(msg, e); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandlerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandlerTests.java index 534f5773c2f..7e50ed2776c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandlerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandlerTests.java @@ -14,10 +14,12 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.concurrent.TimeoutException; public class CppLogMessageHandlerTests extends ESTestCase { - public void testParse() throws IOException { + public void testParse() throws IOException, TimeoutException { String testData = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211," + "\"thread\":\"0x7fff7d2a8000\",\"message\":\"uname -a : Darwin Davids-MacBook-Pro.local 15.6.0 Darwin Kernel " @@ -44,7 +46,10 @@ public class CppLogMessageHandlerTests extends ESTestCase { try (CppLogMessageHandler handler = new CppLogMessageHandler(is, logger, 100, 3)) { handler.tailStream(); + assertTrue(handler.hasLogStreamEnded()); + assertEquals(10211L, handler.getPid(Duration.ofMillis(1))); assertEquals("Did not understand verb 'a'\n", handler.getErrors()); + assertFalse(handler.seenFatalError()); } } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/ProcessCtrlTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/ProcessCtrlTests.java index 28fd6816657..cd1abbe861b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/ProcessCtrlTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/ProcessCtrlTests.java @@ -8,15 +8,12 @@ package org.elasticsearch.xpack.prelert.job.process; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; -import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.prelert.job.AnalysisConfig; import org.elasticsearch.xpack.prelert.job.DataDescription; import org.elasticsearch.xpack.prelert.job.Detector; import org.elasticsearch.xpack.prelert.job.IgnoreDowntime; import org.elasticsearch.xpack.prelert.job.Job; -import org.junit.Before; -import org.mockito.Mock; import org.mockito.Mockito; import java.io.IOException; import java.util.Collections; @@ -25,13 +22,11 @@ import java.util.List; import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder; public class ProcessCtrlTests extends ESTestCase { - @Mock - private Logger logger; - @Before - public void setupMock() { - logger = Mockito.mock(Logger.class); - } + private final Logger logger = Mockito.mock(Logger.class); + // 4194304 is the maximum possible PID on Linux according to + // http://web.archive.org/web/20111209081734/http://research.cs.wisc.edu/condor/condorg/linux_scalability.html + private final long pid = randomIntBetween(2, 4194304); public void testBuildAutodetectCommand() { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); @@ -58,7 +53,7 @@ public class ProcessCtrlTests extends ESTestCase { job.setIgnoreDowntime(IgnoreDowntime.ONCE); - List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false); + List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid); assertEquals(17, command.size()); assertTrue(command.contains(ProcessCtrl.AUTODETECT_PATH)); assertTrue(command.contains(ProcessCtrl.BATCH_SPAN_ARG + "100")); @@ -90,7 +85,7 @@ public class ProcessCtrlTests extends ESTestCase { Environment env = new Environment(settings); Job.Builder job = buildJobBuilder("unit-test-job"); - List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false); + List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid); assertTrue(command.contains(ProcessCtrl.TIME_FIELD_ARG + "time")); } @@ -103,13 +98,13 @@ public class ProcessCtrlTests extends ESTestCase { int expectedPersistInterval = 10800 + ProcessCtrl.calculateStaggeringInterval(job.getId()); - List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false); + List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid); assertFalse(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval)); settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); env = new Environment(settings); - command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false); + command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid); assertTrue(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval)); } @@ -119,7 +114,7 @@ public class ProcessCtrlTests extends ESTestCase { settings); Job.Builder job = buildJobBuilder("foo"); - List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false); + List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid); assertFalse(command.contains("--ignoreDowntime")); } @@ -130,7 +125,7 @@ public class ProcessCtrlTests extends ESTestCase { settings); Job.Builder job = buildJobBuilder("foo"); - List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, true); + List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, true, pid); assertTrue(command.contains("--ignoreDowntime")); } @@ -140,7 +135,7 @@ public class ProcessCtrlTests extends ESTestCase { Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build()); String jobId = "unit-test-job"; - List command = ProcessCtrl.buildNormaliserCommand(env, jobId, null, 300, true, logger); + List command = ProcessCtrl.buildNormaliserCommand(env, jobId, null, 300, true, pid); assertEquals(5, command.size()); assertTrue(command.contains(ProcessCtrl.NORMALIZE_PATH)); assertTrue(command.contains(ProcessCtrl.BUCKET_SPAN_ARG + "300")); @@ -159,7 +154,7 @@ public class ProcessCtrlTests extends ESTestCase { return false; } long val = Long.parseLong(argAndVal[1]); - if ((val % ProcessCtrl.VALIDATION_NUMBER) != (JvmInfo.jvmInfo().pid() % ProcessCtrl.VALIDATION_NUMBER)) { + if ((val % ProcessCtrl.VALIDATION_NUMBER) != (pid % ProcessCtrl.VALIDATION_NUMBER)) { return false; } }