diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java index cfa946a730b..21614c60a15 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java @@ -279,11 +279,10 @@ public class JobResultsPersister extends AbstractComponent { byte[] bytes = bytesRef.toBytesRef().bytes; logger.trace("[{}] ES API CALL: bulk index", jobId); client.prepareBulk() - .add(bytes, 0, bytes.length) - .execute().actionGet(); + .add(bytes, 0, bytes.length) + .execute().actionGet(); } catch (Exception e) { - logger.error((org.apache.logging.log4j.util.Supplier) - () -> new ParameterizedMessage("[{}] Error persisting bulk state", jobId), e); + logger.error(new ParameterizedMessage("[{}] Error persisting bulk state", jobId), e); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/StateProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/StateProcessor.java index b9859d78b0a..b4ea7514683 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/StateProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/StateProcessor.java @@ -32,14 +32,17 @@ public class StateProcessor extends AbstractComponent { public void process(String jobId, InputStream in) { try { BytesReference bytesRef = null; + int searchFrom = 0; byte[] readBuf = new byte[READ_BUF_SIZE]; for (int bytesRead = in.read(readBuf); bytesRead != -1; bytesRead = in.read(readBuf)) { if (bytesRef == null) { + searchFrom = 0; bytesRef = new BytesArray(readBuf, 0, bytesRead); } else { + searchFrom = bytesRef.length(); bytesRef = new CompositeBytesReference(bytesRef, new BytesArray(readBuf, 0, bytesRead)); } - bytesRef = splitAndPersist(jobId, bytesRef); + bytesRef = splitAndPersist(jobId, bytesRef, searchFrom); readBuf = new byte[READ_BUF_SIZE]; } } catch (IOException e) { @@ -53,25 +56,25 @@ public class StateProcessor extends AbstractComponent { * data is expected to be a series of Elasticsearch bulk requests in UTF-8 JSON * (as would be uploaded to the public REST API) separated by zero bytes ('\0'). */ - private BytesReference splitAndPersist(String jobId, BytesReference bytesRef) { - int from = 0; + private BytesReference splitAndPersist(String jobId, BytesReference bytesRef, int searchFrom) { + int splitFrom = 0; while (true) { - int nextZeroByte = findNextZeroByte(bytesRef, from); + int nextZeroByte = findNextZeroByte(bytesRef, searchFrom, splitFrom); if (nextZeroByte == -1) { // No more zero bytes in this block break; } - persister.persistBulkState(jobId, bytesRef.slice(from, nextZeroByte - from)); - from = nextZeroByte + 1; + persister.persistBulkState(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom)); + splitFrom = nextZeroByte + 1; } - if (from >= bytesRef.length()) { + if (splitFrom >= bytesRef.length()) { return null; } - return bytesRef.slice(from, bytesRef.length() - from); + return bytesRef.slice(splitFrom, bytesRef.length() - splitFrom); } - private static int findNextZeroByte(BytesReference bytesRef, int from) { - for (int i = from; i < bytesRef.length(); ++i) { + private static int findNextZeroByte(BytesReference bytesRef, int searchFrom, int splitFrom) { + for (int i = Math.max(searchFrom, splitFrom); i < bytesRef.length(); ++i) { if (bytesRef.get(i) == 0) { return i; } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/StateProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/StateProcessorTests.java index b045578ac96..3a6a42cbcaf 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/StateProcessorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/StateProcessorTests.java @@ -16,7 +16,9 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.concurrent.TimeUnit; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -36,6 +38,9 @@ public class StateProcessorTests extends ESTestCase { + "third data\n" + "\0"; + private static final int NUM_LARGE_DOCS = 2; + private static final int LARGE_DOC_SIZE = 16000000; + public void testStateRead() throws IOException { ByteArrayInputStream stream = new ByteArrayInputStream(STATE_SAMPLE.getBytes(StandardCharsets.UTF_8)); @@ -53,4 +58,31 @@ public class StateProcessorTests extends ESTestCase { assertEquals(threeStates[1], capturedBytes.get(1).utf8ToString()); assertEquals(threeStates[2], capturedBytes.get(2).utf8ToString()); } -} \ No newline at end of file + + /** + * This test is designed to pick up N-squared processing in the state consumption code. + * The size of the state document is comparable to those that the C++ code will create for a huge model. + */ + public void testLargeStateRead() throws Exception { + StringBuilder builder = new StringBuilder(NUM_LARGE_DOCS * (LARGE_DOC_SIZE + 10)); // 10 for header and separators + for (int docNum = 1; docNum <= NUM_LARGE_DOCS; ++docNum) { + builder.append("header").append(docNum).append("\n"); + for (int count = 0; count < (LARGE_DOC_SIZE / "data".length()); ++count) { + builder.append("data"); + } + builder.append("\n\0"); + } + + ByteArrayInputStream stream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8)); + + JobResultsPersister persister = Mockito.mock(JobResultsPersister.class); + + StateProcessor stateParser = new StateProcessor(Settings.EMPTY, persister); + + // 5 seconds is an overestimate to avoid spurious failures due to VM stalls - on a + // reasonable spec laptop this should take around 1 second + assertBusy(() -> stateParser.process("_id", stream), 5, TimeUnit.SECONDS); + + verify(persister, times(NUM_LARGE_DOCS)).persistBulkState(eq("_id"), any()); + } +}