[ML] Fix large state persistence performance (elastic/x-pack-elasticsearch#1004)

There was a problem with the way CompositeBytesReference was used in the
StateProcessor.  In the case of a large state document we ended up with a
deeply nested CompositeBytesReference that then caused a deep stack and N^2
processing in the bulk action processor.

This change uses an intermediate list of byte arrays that get combined into
a single CompositeBytesReference to avoid the deep nesting.

Additionally, errors in state processing now bubble up to close the state
stream, which will cause the C++ process to stop trying to persist more state.

Finally, the results processor also times out after a similar period (30 minutes)
to that used by the state processor.

Original commit: elastic/x-pack-elasticsearch@ceb31481d1
This commit is contained in:
David Roberts 2017-04-07 15:57:21 +01:00 committed by GitHub
parent fbefaf5b6d
commit 3986235d93
6 changed files with 53 additions and 38 deletions

View File

@ -126,9 +126,12 @@ public class AutodetectCommunicator implements Closeable {
public void close(boolean restart, String reason) throws IOException {
Future<?> future = autodetectWorkerExecutor.submit(() -> {
checkProcessIsAlive();
autodetectProcess.close();
autoDetectResultProcessor.awaitCompletion();
handler.accept(restart ? new ElasticsearchException(reason) : null);
try {
autodetectProcess.close();
autoDetectResultProcessor.awaitCompletion();
} finally {
handler.accept(restart ? new ElasticsearchException(reason) : null);
}
LOGGER.info("[{}] job closed", job.getId());
return null;
});

View File

@ -77,8 +77,7 @@ class NativeAutodetectProcess implements AutodetectProcess {
try (CppLogMessageHandler h = cppLogHandler) {
h.tailStream();
} catch (IOException e) {
LOGGER.error(new ParameterizedMessage("[{}] Error tailing autodetect process logs",
new Object[] { jobId }), e);
LOGGER.error(new ParameterizedMessage("[{}] Error tailing autodetect process logs", jobId), e);
} finally {
if (processCloseInitiated == false) {
// The log message doesn't say "crashed", as the process could have been killed
@ -89,7 +88,11 @@ class NativeAutodetectProcess implements AutodetectProcess {
}
});
stateProcessorFuture = executorService.submit(() -> {
stateProcessor.process(jobId, persistStream);
try (InputStream in = persistStream) {
stateProcessor.process(jobId, in);
} catch (IOException e) {
LOGGER.error(new ParameterizedMessage("[{}] Error reading autodetect state output", jobId), e);
}
});
}

View File

@ -34,6 +34,8 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* A runnable class that reads the autodetect process output in the
@ -222,9 +224,13 @@ public class AutoDetectResultProcessor {
});
}
public void awaitCompletion() {
public void awaitCompletion() throws TimeoutException {
try {
completionLatch.await();
// Although the results won't take 30 minutes to finish, the pipe won't be closed
// until the state is persisted, and that can take a while
if (completionLatch.await(30, TimeUnit.MINUTES) == false) {
throw new TimeoutException("Timed out waiting for results processor to complete for job " + jobId);
}
// Input stream has been completely processed at this point.
// Wait for any updateModelSnapshotIdOnJob calls to complete.
updateModelSnapshotIdSemaphore.acquire();

View File

@ -14,10 +14,11 @@ import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
/**
* Reads the autodetect state and persists via a bulk request
@ -32,24 +33,29 @@ public class StateProcessor extends AbstractComponent {
this.client = client;
}
public void process(String jobId, InputStream in) {
try {
BytesReference bytesRef = null;
int searchFrom = 0;
byte[] readBuf = new byte[READ_BUF_SIZE];
for (int bytesRead = in.read(readBuf); bytesRead != -1; bytesRead = in.read(readBuf)) {
if (bytesRef == null) {
searchFrom = 0;
bytesRef = new BytesArray(readBuf, 0, bytesRead);
} else {
searchFrom = bytesRef.length();
bytesRef = new CompositeBytesReference(bytesRef, new BytesArray(readBuf, 0, bytesRead));
}
bytesRef = splitAndPersist(jobId, bytesRef, searchFrom);
readBuf = new byte[READ_BUF_SIZE];
public void process(String jobId, InputStream in) throws IOException {
BytesReference bytesToDate = null;
List<BytesReference> newBlocks = new ArrayList<>();
byte[] readBuf = new byte[READ_BUF_SIZE];
int searchFrom = 0;
// The original implementation of this loop created very deeply nested
// CompositeBytesReference objects, which caused problems for the bulk persister.
// This new implementation uses an intermediate List of blocks that don't contain
// end markers to avoid such deep nesting in the CompositeBytesReference that
// eventually gets created.
for (int bytesRead = in.read(readBuf); bytesRead != -1; bytesRead = in.read(readBuf)) {
BytesArray newBlock = new BytesArray(readBuf, 0, bytesRead);
newBlocks.add(newBlock);
if (findNextZeroByte(newBlock, 0, 0) == -1) {
searchFrom += bytesRead;
} else {
BytesReference newBytes = new CompositeBytesReference(newBlocks.toArray(new BytesReference[0]));
bytesToDate = (bytesToDate == null) ? newBytes : new CompositeBytesReference(bytesToDate, newBytes);
bytesToDate = splitAndPersist(jobId, bytesToDate, searchFrom);
searchFrom = (bytesToDate == null) ? 0 : bytesToDate.length();
newBlocks.clear();
}
} catch (IOException e) {
logger.info(new ParameterizedMessage("[{}] Error reading autodetect state output", jobId), e);
readBuf = new byte[READ_BUF_SIZE];
}
logger.info("[{}] State output finished", jobId);
}
@ -59,7 +65,7 @@ public class StateProcessor extends AbstractComponent {
* data is expected to be a series of Elasticsearch bulk requests in UTF-8 JSON
* (as would be uploaded to the public REST API) separated by zero bytes ('\0').
*/
private BytesReference splitAndPersist(String jobId, BytesReference bytesRef, int searchFrom) {
private BytesReference splitAndPersist(String jobId, BytesReference bytesRef, int searchFrom) throws IOException {
int splitFrom = 0;
while (true) {
int nextZeroByte = findNextZeroByte(bytesRef, searchFrom, splitFrom);
@ -77,15 +83,11 @@ public class StateProcessor extends AbstractComponent {
return bytesRef.slice(splitFrom, bytesRef.length() - splitFrom);
}
void persist(String jobId, BytesReference bytes) {
try {
logger.trace("[{}] ES API CALL: bulk index", jobId);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bytes, null, null, XContentType.JSON);
client.bulk(bulkRequest).actionGet();
} catch (Exception e) {
logger.error(new ParameterizedMessage("[{}] Error persisting bulk state", jobId), e);
}
void persist(String jobId, BytesReference bytes) throws IOException {
logger.trace("[{}] ES API CALL: bulk index", jobId);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bytes, null, null, XContentType.JSON);
client.bulk(bulkRequest).actionGet();
}
private static int findNextZeroByte(BytesReference bytesRef, int searchFrom, int splitFrom) {

View File

@ -29,6 +29,7 @@ import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@ -292,7 +293,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verifyNoMoreInteractions(renormalizer);
}
public void testAwaitCompletion() {
public void testAwaitCompletion() throws TimeoutException {
AutodetectResult autodetectResult = mock(AutodetectResult.class);
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class);

View File

@ -47,7 +47,7 @@ public class StateProcessorTests extends ESTestCase {
private StateProcessor stateProcessor;
@Before
public void initialize() {
public void initialize() throws IOException {
stateProcessor = spy(new StateProcessor(Settings.EMPTY, mock(Client.class)));
doNothing().when(stateProcessor).persist(any(), any());
}