Speed up state processing (elastic/elasticsearch#642)

There was an N-squared algorithm in the state processing code, leading
to large state persistence eventually timing out.  Large state documents
are read from the network in 8KB chunks, and the old code was checking
ALL previously read chunks for separators every time a new chunk was read.

Fixes elastic/elasticsearch#635

Original commit: elastic/x-pack-elasticsearch@c814858c2c
This commit is contained in:
David Roberts 2017-01-05 12:37:11 +00:00 committed by GitHub
parent c7cfa56aaf
commit 27c9f39bf5
3 changed files with 49 additions and 15 deletions

View File

@ -279,11 +279,10 @@ public class JobResultsPersister extends AbstractComponent {
byte[] bytes = bytesRef.toBytesRef().bytes;
logger.trace("[{}] ES API CALL: bulk index", jobId);
client.prepareBulk()
.add(bytes, 0, bytes.length)
.execute().actionGet();
.add(bytes, 0, bytes.length)
.execute().actionGet();
} catch (Exception e) {
logger.error((org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("[{}] Error persisting bulk state", jobId), e);
logger.error(new ParameterizedMessage("[{}] Error persisting bulk state", jobId), e);
}
}

View File

@ -32,14 +32,17 @@ public class StateProcessor extends AbstractComponent {
public void process(String jobId, InputStream in) {
try {
BytesReference bytesRef = null;
int searchFrom = 0;
byte[] readBuf = new byte[READ_BUF_SIZE];
for (int bytesRead = in.read(readBuf); bytesRead != -1; bytesRead = in.read(readBuf)) {
if (bytesRef == null) {
searchFrom = 0;
bytesRef = new BytesArray(readBuf, 0, bytesRead);
} else {
searchFrom = bytesRef.length();
bytesRef = new CompositeBytesReference(bytesRef, new BytesArray(readBuf, 0, bytesRead));
}
bytesRef = splitAndPersist(jobId, bytesRef);
bytesRef = splitAndPersist(jobId, bytesRef, searchFrom);
readBuf = new byte[READ_BUF_SIZE];
}
} catch (IOException e) {
@ -53,25 +56,25 @@ public class StateProcessor extends AbstractComponent {
* data is expected to be a series of Elasticsearch bulk requests in UTF-8 JSON
* (as would be uploaded to the public REST API) separated by zero bytes ('\0').
*/
private BytesReference splitAndPersist(String jobId, BytesReference bytesRef) {
int from = 0;
private BytesReference splitAndPersist(String jobId, BytesReference bytesRef, int searchFrom) {
int splitFrom = 0;
while (true) {
int nextZeroByte = findNextZeroByte(bytesRef, from);
int nextZeroByte = findNextZeroByte(bytesRef, searchFrom, splitFrom);
if (nextZeroByte == -1) {
// No more zero bytes in this block
break;
}
persister.persistBulkState(jobId, bytesRef.slice(from, nextZeroByte - from));
from = nextZeroByte + 1;
persister.persistBulkState(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom));
splitFrom = nextZeroByte + 1;
}
if (from >= bytesRef.length()) {
if (splitFrom >= bytesRef.length()) {
return null;
}
return bytesRef.slice(from, bytesRef.length() - from);
return bytesRef.slice(splitFrom, bytesRef.length() - splitFrom);
}
private static int findNextZeroByte(BytesReference bytesRef, int from) {
for (int i = from; i < bytesRef.length(); ++i) {
private static int findNextZeroByte(BytesReference bytesRef, int searchFrom, int splitFrom) {
for (int i = Math.max(searchFrom, splitFrom); i < bytesRef.length(); ++i) {
if (bytesRef.get(i) == 0) {
return i;
}

View File

@ -16,7 +16,9 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -36,6 +38,9 @@ public class StateProcessorTests extends ESTestCase {
+ "third data\n"
+ "\0";
private static final int NUM_LARGE_DOCS = 2;
private static final int LARGE_DOC_SIZE = 16000000;
public void testStateRead() throws IOException {
ByteArrayInputStream stream = new ByteArrayInputStream(STATE_SAMPLE.getBytes(StandardCharsets.UTF_8));
@ -53,4 +58,31 @@ public class StateProcessorTests extends ESTestCase {
assertEquals(threeStates[1], capturedBytes.get(1).utf8ToString());
assertEquals(threeStates[2], capturedBytes.get(2).utf8ToString());
}
}
/**
* This test is designed to pick up N-squared processing in the state consumption code.
* The size of the state document is comparable to those that the C++ code will create for a huge model.
*/
public void testLargeStateRead() throws Exception {
StringBuilder builder = new StringBuilder(NUM_LARGE_DOCS * (LARGE_DOC_SIZE + 10)); // 10 for header and separators
for (int docNum = 1; docNum <= NUM_LARGE_DOCS; ++docNum) {
builder.append("header").append(docNum).append("\n");
for (int count = 0; count < (LARGE_DOC_SIZE / "data".length()); ++count) {
builder.append("data");
}
builder.append("\n\0");
}
ByteArrayInputStream stream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8));
JobResultsPersister persister = Mockito.mock(JobResultsPersister.class);
StateProcessor stateParser = new StateProcessor(Settings.EMPTY, persister);
// 5 seconds is an overestimate to avoid spurious failures due to VM stalls - on a
// reasonable spec laptop this should take around 1 second
assertBusy(() -> stateParser.process("_id", stream), 5, TimeUnit.SECONDS);
verify(persister, times(NUM_LARGE_DOCS)).persistBulkState(eq("_id"), any());
}
}