diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandler.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandler.java index 8ca4334a8bb..5288d0925de 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandler.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandler.java @@ -159,6 +159,9 @@ public class CppLogMessageHandler implements Closeable { } from = nextMarker + 1; } + if (from >= bytesRef.length()) { + return null; + } return bytesRef.slice(from, bytesRef.length() - from); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/StateProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/StateProcessor.java index 2bb11329d39..b9859d78b0a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/StateProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/StateProcessor.java @@ -64,6 +64,9 @@ public class StateProcessor extends AbstractComponent { persister.persistBulkState(jobId, bytesRef.slice(from, nextZeroByte - from)); from = nextZeroByte + 1; } + if (from >= bytesRef.length()) { + return null; + } return bytesRef.slice(from, bytesRef.length() - from); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandlerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandlerTests.java index 08de6a87e17..43c5473c597 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandlerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/logging/CppLogMessageHandlerTests.java @@ -35,14 +35,17 @@ public class CppLogMessageHandlerTests extends ESTestCase { + "{\"logger\":\"controller\",\"timestamp\":1478261169065,\"level\":\"DEBUG\",\"pid\":10211,\"thread\":\"0x7fff7d2a8000\"," + "\"message\":\"Prelert controller exiting\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":147}\n"; - InputStream is = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); - try (CppLogMessageHandler handler = new CppLogMessageHandler(is, "_id", 100, 3)) { - handler.tailStream(); + // Try different buffer sizes to smoke out edge case problems in the buffer management + for (int readBufSize : new int[] { 11, 42, 101, 1024, 9999 }) { + InputStream is = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + try (CppLogMessageHandler handler = new CppLogMessageHandler(is, "_id", readBufSize, 3)) { + handler.tailStream(); - assertTrue(handler.hasLogStreamEnded()); - assertEquals(10211L, handler.getPid(Duration.ofMillis(1))); - assertEquals("Did not understand verb 'a'\n", handler.getErrors()); - assertFalse(handler.seenFatalError()); + assertTrue(handler.hasLogStreamEnded()); + assertEquals(10211L, handler.getPid(Duration.ofMillis(1))); + assertEquals("Did not understand verb 'a'\n", handler.getErrors()); + assertFalse(handler.seenFatalError()); + } } } }