Optimize Bulk Message Parsing and Message Length Parsing (#39634) (#39730)

* Optimize Bulk Message Parsing and Message Length Parsing

* findNextMarker took almost 1ms per invocation during the PMC rally track
  * Fixed to be about an order of magnitude faster by using Netty's bulk `ByteBuf` search
* It is unnecessary to instantiate an object (the input stream wrapper) and throw it away, just to read the `int` length from the message bytes
  * Fixed by adding bulk `int` read to BytesReference
This commit is contained in:
Armin Braun 2019-03-06 08:13:15 +01:00 committed by GitHub
parent 4a3fa5ac7d
commit aaecaf59a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 105 additions and 24 deletions

View File

@ -45,6 +45,17 @@ final class ByteBufBytesReference extends BytesReference {
return buffer.getByte(offset + index); return buffer.getByte(offset + index);
} }
@Override
public int getInt(int index) {
return buffer.getInt(offset + index);
}
@Override
public int indexOf(byte marker, int from) {
final int start = offset + from;
return buffer.forEachByte(start, length - start, value -> value != marker);
}
@Override @Override
public int length() { public int length() {
return length; return length;

View File

@ -90,6 +90,17 @@ class ByteBufUtils {
return buffer.getByte(offset + index); return buffer.getByte(offset + index);
} }
@Override
public int getInt(int index) {
return buffer.getInt(offset + index);
}
@Override
public int indexOf(byte marker, int from) {
final int start = offset + from;
return buffer.forEachByte(start, length - start, value -> value != marker);
}
@Override @Override
public int length() { public int length() {
return length; return length;

View File

@ -364,11 +364,10 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
XContent xContent = xContentType.xContent(); XContent xContent = xContentType.xContent();
int line = 0; int line = 0;
int from = 0; int from = 0;
int length = data.length();
byte marker = xContent.streamSeparator(); byte marker = xContent.streamSeparator();
boolean typesDeprecationLogged = false; boolean typesDeprecationLogged = false;
while (true) { while (true) {
int nextMarker = findNextMarker(marker, from, data, length); int nextMarker = findNextMarker(marker, from, data);
if (nextMarker == -1) { if (nextMarker == -1) {
break; break;
} }
@ -477,7 +476,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
add(new DeleteRequest(index, type, id).routing(routing) add(new DeleteRequest(index, type, id).routing(routing)
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm), payload); .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm), payload);
} else { } else {
nextMarker = findNextMarker(marker, from, data, length); nextMarker = findNextMarker(marker, from, data);
if (nextMarker == -1) { if (nextMarker == -1) {
break; break;
} }
@ -615,16 +614,16 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
return globalRouting; return globalRouting;
} }
private int findNextMarker(byte marker, int from, BytesReference data, int length) { private static int findNextMarker(byte marker, int from, BytesReference data) {
for (int i = from; i < length; i++) { final int res = data.indexOf(marker, from);
if (data.get(i) == marker) { if (res != -1) {
return i; assert res >= 0;
} return res;
} }
if (from != length) { if (from != data.length()) {
throw new IllegalArgumentException("The bulk request must be terminated by a newline [\n]"); throw new IllegalArgumentException("The bulk request must be terminated by a newline [\n]");
} }
return -1; return res;
} }
@Override @Override

View File

@ -177,10 +177,9 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
NamedXContentRegistry registry, NamedXContentRegistry registry,
boolean allowExplicitIndex) throws IOException { boolean allowExplicitIndex) throws IOException {
int from = 0; int from = 0;
int length = data.length();
byte marker = xContent.streamSeparator(); byte marker = xContent.streamSeparator();
while (true) { while (true) {
int nextMarker = findNextMarker(marker, from, data, length); int nextMarker = findNextMarker(marker, from, data);
if (nextMarker == -1) { if (nextMarker == -1) {
break; break;
} }
@ -261,7 +260,7 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
// move pointers // move pointers
from = nextMarker + 1; from = nextMarker + 1;
// now for the body // now for the body
nextMarker = findNextMarker(marker, from, data, length); nextMarker = findNextMarker(marker, from, data);
if (nextMarker == -1) { if (nextMarker == -1) {
break; break;
} }
@ -275,13 +274,13 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
} }
} }
private static int findNextMarker(byte marker, int from, BytesReference data, int length) { private static int findNextMarker(byte marker, int from, BytesReference data) {
for (int i = from; i < length; i++) { final int res = data.indexOf(marker, from);
if (data.get(i) == marker) { if (res != -1) {
return i; assert res >= 0;
} return res;
} }
if (from != length) { if (from != data.length()) {
throw new IllegalArgumentException("The msearch request must be terminated by a newline [\n]"); throw new IllegalArgumentException("The msearch request must be terminated by a newline [\n]");
} }
return -1; return -1;

View File

@ -46,6 +46,11 @@ public class ByteBufferReference extends BytesReference {
return buffer.get(index); return buffer.get(index);
} }
@Override
public int getInt(int index) {
return buffer.getInt(index);
}
@Override @Override
public int length() { public int length() {
return length; return length;

View File

@ -60,6 +60,29 @@ public abstract class BytesReference implements Comparable<BytesReference>, ToXC
*/ */
public abstract byte get(int index); public abstract byte get(int index);
/**
* Returns the integer read from the 4 bytes (BE) starting at the given index.
*/
public int getInt(int index) {
return (get(index) & 0xFF) << 24 | (get(index + 1) & 0xFF) << 16 | (get(index + 2) & 0xFF) << 8 | get(index + 3) & 0xFF;
}
/**
* Finds the index of the first occurrence of the given marker between within the given bounds.
* @param marker marker byte to search
* @param from lower bound for the index to check (inclusive)
* @return first index of the marker or {@code -1} if not found
*/
public int indexOf(byte marker, int from) {
final int to = length();
for (int i = from; i < to; i++) {
if (get(i) == marker) {
return i;
}
}
return -1;
}
/** /**
* The length. * The length.
*/ */

View File

@ -840,11 +840,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
+ Integer.toHexString(headerBuffer.get(2) & 0xFF) + "," + Integer.toHexString(headerBuffer.get(2) & 0xFF) + ","
+ Integer.toHexString(headerBuffer.get(3) & 0xFF) + ")"); + Integer.toHexString(headerBuffer.get(3) & 0xFF) + ")");
} }
final int messageLength; final int messageLength = headerBuffer.getInt(TcpHeader.MARKER_BYTES_SIZE);
try (StreamInput input = headerBuffer.streamInput()) {
input.skip(TcpHeader.MARKER_BYTES_SIZE);
messageLength = input.readInt();
}
if (messageLength == TransportKeepAlive.PING_DATA_SIZE) { if (messageLength == TransportKeepAlive.PING_DATA_SIZE) {
// This is a ping // This is a ping

View File

@ -34,7 +34,14 @@ import org.elasticsearch.test.ESTestCase;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public abstract class AbstractBytesReferenceTestCase extends ESTestCase { public abstract class AbstractBytesReferenceTestCase extends ESTestCase {
@ -648,4 +655,34 @@ public abstract class AbstractBytesReferenceTestCase extends ESTestCase {
assertNotEquals(b1, b2); assertNotEquals(b1, b2);
} }
} }
public void testGetInt() throws IOException {
final int count = randomIntBetween(1, 10);
final BytesReference bytesReference = newBytesReference(count * Integer.BYTES);
final BytesRef bytesRef = bytesReference.toBytesRef();
final IntBuffer intBuffer =
ByteBuffer.wrap(bytesRef.bytes, bytesRef.offset, bytesRef.length).order(ByteOrder.BIG_ENDIAN).asIntBuffer();
for (int i = 0; i < count; ++i) {
assertEquals(intBuffer.get(i), bytesReference.getInt(i * Integer.BYTES));
}
}
public void testIndexOf() throws IOException {
final int size = randomIntBetween(0, 100);
final BytesReference bytesReference = newBytesReference(size);
final Map<Byte, List<Integer>> map = new HashMap<>();
for (int i = 0; i < size; ++i) {
final byte value = bytesReference.get(i);
map.computeIfAbsent(value, v -> new ArrayList<>()).add(i);
}
map.forEach((value, positions) -> {
for (int i = 0; i < positions.size(); i++) {
final int pos = positions.get(i);
final int from = i == 0 ? randomIntBetween(0, pos) : positions.get(i - 1) + 1;
assertEquals(bytesReference.indexOf(value, from), pos);
}
});
final byte missing = randomValueOtherThanMany(map::containsKey, ESTestCase::randomByte);
assertEquals(-1, bytesReference.indexOf(missing, randomIntBetween(0, Math.max(0, size - 1))));
}
} }