From 3986235d932914b0921fb2ef4436649378fa6bfa Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 7 Apr 2017 15:57:21 +0100 Subject: [PATCH] [ML] Fix large state persistence performance (elastic/x-pack-elasticsearch#1004) There was a problem with the way CompositeBytesReference was used in the StateProcessor. In the case of a large state document we ended up with a deeply nested CompositeBytesReference that then caused a deep stack and N^2 processing in the bulk action processor. This change uses an intermediate list of byte arrays that get combined into a single CompositeBytesReference to avoid the deep nesting. Additionally, errors in state processing now bubble up to close the state stream, which will cause the C++ process to stop trying to persist more state. Finally, the results processor also times out after a similar period (30 minutes) to that used by the state processor. Original commit: elastic/x-pack-elasticsearch@ceb31481d140b3a90acc5d57a29c32c25dd8d916 --- .../autodetect/AutodetectCommunicator.java | 9 ++- .../autodetect/NativeAutodetectProcess.java | 9 ++- .../output/AutoDetectResultProcessor.java | 10 +++- .../autodetect/output/StateProcessor.java | 58 ++++++++++--------- .../AutoDetectResultProcessorTests.java | 3 +- .../output/StateProcessorTests.java | 2 +- 6 files changed, 53 insertions(+), 38 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index b39a1cad516..5459493bbcc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -126,9 +126,12 @@ public class AutodetectCommunicator implements Closeable { public void close(boolean restart, String reason) throws IOException { Future future = autodetectWorkerExecutor.submit(() -> { checkProcessIsAlive(); - autodetectProcess.close(); - autoDetectResultProcessor.awaitCompletion(); - handler.accept(restart ? new ElasticsearchException(reason) : null); + try { + autodetectProcess.close(); + autoDetectResultProcessor.awaitCompletion(); + } finally { + handler.accept(restart ? new ElasticsearchException(reason) : null); + } LOGGER.info("[{}] job closed", job.getId()); return null; }); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index ed4c6582f95..8c4a9fda4db 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -77,8 +77,7 @@ class NativeAutodetectProcess implements AutodetectProcess { try (CppLogMessageHandler h = cppLogHandler) { h.tailStream(); } catch (IOException e) { - LOGGER.error(new ParameterizedMessage("[{}] Error tailing autodetect process logs", - new Object[] { jobId }), e); + LOGGER.error(new ParameterizedMessage("[{}] Error tailing autodetect process logs", jobId), e); } finally { if (processCloseInitiated == false) { // The log message doesn't say "crashed", as the process could have been killed @@ -89,7 +88,11 @@ class NativeAutodetectProcess implements AutodetectProcess { } }); stateProcessorFuture = executorService.submit(() -> { - stateProcessor.process(jobId, persistStream); + try (InputStream in = persistStream) { + stateProcessor.process(jobId, in); + } catch (IOException e) { + LOGGER.error(new ParameterizedMessage("[{}] Error reading autodetect state output", jobId), e); + } }); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 3c73d6f83de..3e79e61e36e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -34,6 +34,8 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * A runnable class that reads the autodetect process output in the @@ -222,9 +224,13 @@ public class AutoDetectResultProcessor { }); } - public void awaitCompletion() { + public void awaitCompletion() throws TimeoutException { try { - completionLatch.await(); + // Although the results won't take 30 minutes to finish, the pipe won't be closed + // until the state is persisted, and that can take a while + if (completionLatch.await(30, TimeUnit.MINUTES) == false) { + throw new TimeoutException("Timed out waiting for results processor to complete for job " + jobId); + } // Input stream has been completely processed at this point. // Wait for any updateModelSnapshotIdOnJob calls to complete. updateModelSnapshotIdSemaphore.acquire(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java index da4dbd30fc9..c255e4b98f5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java @@ -14,10 +14,11 @@ import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; /** * Reads the autodetect state and persists via a bulk request @@ -32,24 +33,29 @@ public class StateProcessor extends AbstractComponent { this.client = client; } - public void process(String jobId, InputStream in) { - try { - BytesReference bytesRef = null; - int searchFrom = 0; - byte[] readBuf = new byte[READ_BUF_SIZE]; - for (int bytesRead = in.read(readBuf); bytesRead != -1; bytesRead = in.read(readBuf)) { - if (bytesRef == null) { - searchFrom = 0; - bytesRef = new BytesArray(readBuf, 0, bytesRead); - } else { - searchFrom = bytesRef.length(); - bytesRef = new CompositeBytesReference(bytesRef, new BytesArray(readBuf, 0, bytesRead)); - } - bytesRef = splitAndPersist(jobId, bytesRef, searchFrom); - readBuf = new byte[READ_BUF_SIZE]; + public void process(String jobId, InputStream in) throws IOException { + BytesReference bytesToDate = null; + List newBlocks = new ArrayList<>(); + byte[] readBuf = new byte[READ_BUF_SIZE]; + int searchFrom = 0; + // The original implementation of this loop created very deeply nested + // CompositeBytesReference objects, which caused problems for the bulk persister. + // This new implementation uses an intermediate List of blocks that don't contain + // end markers to avoid such deep nesting in the CompositeBytesReference that + // eventually gets created. + for (int bytesRead = in.read(readBuf); bytesRead != -1; bytesRead = in.read(readBuf)) { + BytesArray newBlock = new BytesArray(readBuf, 0, bytesRead); + newBlocks.add(newBlock); + if (findNextZeroByte(newBlock, 0, 0) == -1) { + searchFrom += bytesRead; + } else { + BytesReference newBytes = new CompositeBytesReference(newBlocks.toArray(new BytesReference[0])); + bytesToDate = (bytesToDate == null) ? newBytes : new CompositeBytesReference(bytesToDate, newBytes); + bytesToDate = splitAndPersist(jobId, bytesToDate, searchFrom); + searchFrom = (bytesToDate == null) ? 0 : bytesToDate.length(); + newBlocks.clear(); } - } catch (IOException e) { - logger.info(new ParameterizedMessage("[{}] Error reading autodetect state output", jobId), e); + readBuf = new byte[READ_BUF_SIZE]; } logger.info("[{}] State output finished", jobId); } @@ -59,7 +65,7 @@ public class StateProcessor extends AbstractComponent { * data is expected to be a series of Elasticsearch bulk requests in UTF-8 JSON * (as would be uploaded to the public REST API) separated by zero bytes ('\0'). */ - private BytesReference splitAndPersist(String jobId, BytesReference bytesRef, int searchFrom) { + private BytesReference splitAndPersist(String jobId, BytesReference bytesRef, int searchFrom) throws IOException { int splitFrom = 0; while (true) { int nextZeroByte = findNextZeroByte(bytesRef, searchFrom, splitFrom); @@ -77,15 +83,11 @@ public class StateProcessor extends AbstractComponent { return bytesRef.slice(splitFrom, bytesRef.length() - splitFrom); } - void persist(String jobId, BytesReference bytes) { - try { - logger.trace("[{}] ES API CALL: bulk index", jobId); - BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.add(bytes, null, null, XContentType.JSON); - client.bulk(bulkRequest).actionGet(); - } catch (Exception e) { - logger.error(new ParameterizedMessage("[{}] Error persisting bulk state", jobId), e); - } + void persist(String jobId, BytesReference bytes) throws IOException { + logger.trace("[{}] ES API CALL: bulk index", jobId); + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(bytes, null, null, XContentType.JSON); + client.bulk(bulkRequest).actionGet(); } private static int findNextZeroByte(BytesReference bytesRef, int searchFrom, int splitFrom) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 73b1ac8a672..38ba450c450 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Date; import java.util.Iterator; import java.util.List; +import java.util.concurrent.TimeoutException; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -292,7 +293,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { verifyNoMoreInteractions(renormalizer); } - public void testAwaitCompletion() { + public void testAwaitCompletion() throws TimeoutException { AutodetectResult autodetectResult = mock(AutodetectResult.class); @SuppressWarnings("unchecked") Iterator iterator = mock(Iterator.class); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java index 629df3848ab..92adddcc318 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java @@ -47,7 +47,7 @@ public class StateProcessorTests extends ESTestCase { private StateProcessor stateProcessor; @Before - public void initialize() { + public void initialize() throws IOException { stateProcessor = spy(new StateProcessor(Settings.EMPTY, mock(Client.class))); doNothing().when(stateProcessor).persist(any(), any()); }