From 0a879b95d1d1af5abd51ec6a748ea88fe775cf72 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 12 May 2020 20:33:45 +0200 Subject: [PATCH] Save Bounds Checks in BytesReference (#56577) (#56621) Two spots that allow for some optimization: * We are often creating a composite reference of just a single item in the transport layer => special cased via static constructor to make sure we never do that * Also removed the pointless case of an empty composite bytes ref * `ByteBufferReference` is practically always created from a heap buffer these days so there is no point of dealing with all the bounds checks and extra references to sliced buffers from that and we can just use the underlying array directly --- .../transport/nio/TcpReadWriteHandler.java | 2 +- .../common/bytes/BytesReference.java | 9 ++-- .../common/bytes/CompositeBytesReference.java | 51 +++++++++++-------- .../transport/InboundAggregator.java | 3 +- .../transport/InboundPipeline.java | 2 +- .../transport/OutboundMessage.java | 2 +- .../bytes/CompositeBytesReferenceTests.java | 6 +-- .../bytes/ReleasableBytesReferenceTests.java | 2 +- .../transport/TransportDecompressorTests.java | 4 +- .../gcs/GoogleCloudStorageHttpHandler.java | 2 +- .../transport/nio/MockNioTransport.java | 2 +- .../output/NormalizerResultHandler.java | 2 +- .../ml/process/IndexingStateProcessor.java | 4 +- .../process/logging/CppLogMessageHandler.java | 2 +- 14 files changed, 53 insertions(+), 40 deletions(-) diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java index 48ac4f4f82e..a74dbc0abe0 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java @@ -60,7 +60,7 @@ public class TcpReadWriteHandler extends BytesWriteHandler { references[i] = BytesReference.fromByteBuffer(pages[i].byteBuffer()); } Releasable releasable = () -> IOUtils.closeWhileHandlingException(pages); - try (ReleasableBytesReference reference = new ReleasableBytesReference(new CompositeBytesReference(references), releasable)) { + try (ReleasableBytesReference reference = new ReleasableBytesReference(CompositeBytesReference.of(references), releasable)) { pipeline.handleBytes(channel, reference); return reference.length(); } diff --git a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java index b058d62f337..f830ebf9b8e 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java @@ -94,12 +94,12 @@ public interface BytesReference extends Comparable, ToXContentFr } else if (bufferCount == 1) { return fromByteBuffer(buffers[0]); } else { - ByteBufferReference[] references = new ByteBufferReference[bufferCount]; + BytesReference[] references = new BytesReference[bufferCount]; for (int i = 0; i < bufferCount; ++i) { - references[i] = new ByteBufferReference(buffers[i]); + references[i] = fromByteBuffer(buffers[i]); } - return new CompositeBytesReference(references); + return CompositeBytesReference.of(references); } } @@ -107,6 +107,9 @@ public interface BytesReference extends Comparable, ToXContentFr * Returns BytesReference composed of the provided ByteBuffer. */ static BytesReference fromByteBuffer(ByteBuffer buffer) { + if (buffer.hasArray()) { + return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + } return new ByteBufferReference(buffer); } diff --git a/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java index f82fc4624af..52b5a951b85 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java @@ -42,7 +42,20 @@ public final class CompositeBytesReference extends AbstractBytesReference { private final int length; private final long ramBytesUsed; - public CompositeBytesReference(BytesReference... references) { + public static BytesReference of(BytesReference... references) { + switch (references.length) { + case 0: + return BytesArray.EMPTY; + case 1: + return references[0]; + default: + return new CompositeBytesReference(references); + } + } + + private CompositeBytesReference(BytesReference... references) { + assert references.length > 1 + : "Should not build composite reference from less than two references but received [" + references.length + "]"; this.references = Objects.requireNonNull(references, "references must not be null"); this.offsets = new int[references.length]; long ramBytesUsed = 0; @@ -154,29 +167,25 @@ public final class CompositeBytesReference extends AbstractBytesReference { @Override public BytesRefIterator iterator() { - if (references.length > 0) { - return new BytesRefIterator() { - int index = 0; - private BytesRefIterator current = references[index++].iterator(); - @Override - public BytesRef next() throws IOException { - BytesRef next = current.next(); - if (next == null) { - while (index < references.length) { - current = references[index++].iterator(); - next = current.next(); - if (next != null) { - break; - } + return new BytesRefIterator() { + int index = 0; + private BytesRefIterator current = references[index++].iterator(); + + @Override + public BytesRef next() throws IOException { + BytesRef next = current.next(); + if (next == null) { + while (index < references.length) { + current = references[index++].iterator(); + next = current.next(); + if (next != null) { + break; } } - return next; } - }; - } else { - return () -> null; - } - + return next; + } + }; } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/InboundAggregator.java b/server/src/main/java/org/elasticsearch/transport/InboundAggregator.java index 4516bfe8b16..6cf20a03a60 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundAggregator.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundAggregator.java @@ -22,6 +22,7 @@ package org.elasticsearch.transport; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.lease.Releasable; @@ -101,7 +102,7 @@ public class InboundAggregator implements Releasable { releasableContent = firstContent; } else { final ReleasableBytesReference[] references = contentAggregation.toArray(new ReleasableBytesReference[0]); - final CompositeBytesReference content = new CompositeBytesReference(references); + final BytesReference content = CompositeBytesReference.of(references); releasableContent = new ReleasableBytesReference(content, () -> Releasables.close(references)); } diff --git a/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java b/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java index a9e71c55b4f..533300f8022 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java @@ -164,7 +164,7 @@ public class InboundPipeline implements Releasable { ++index; } final Releasable releasable = () -> Releasables.closeWhileHandlingException(bytesReferences); - return new ReleasableBytesReference(new CompositeBytesReference(bytesReferences), releasable); + return new ReleasableBytesReference(CompositeBytesReference.of(bytesReferences), releasable); } } diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java b/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java index e968a76d3a1..847de9a924d 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java @@ -96,7 +96,7 @@ abstract class OutboundMessage extends NetworkMessage { if (zeroCopyBuffer.length() == 0) { return message; } else { - return new CompositeBytesReference(message, zeroCopyBuffer); + return CompositeBytesReference.of(message, zeroCopyBuffer); } } diff --git a/server/src/test/java/org/elasticsearch/common/bytes/CompositeBytesReferenceTests.java b/server/src/test/java/org/elasticsearch/common/bytes/CompositeBytesReferenceTests.java index f99c9405502..2f283cbaa75 100644 --- a/server/src/test/java/org/elasticsearch/common/bytes/CompositeBytesReferenceTests.java +++ b/server/src/test/java/org/elasticsearch/common/bytes/CompositeBytesReferenceTests.java @@ -40,7 +40,7 @@ public class CompositeBytesReferenceTests extends AbstractBytesReferenceTestCase protected BytesReference newBytesReferenceWithOffsetOfZero(int length) throws IOException { // we know bytes stream output always creates a paged bytes reference, we use it to create randomized content List referenceList = newRefList(length); - BytesReference ref = new CompositeBytesReference(referenceList.toArray(new BytesReference[0])); + BytesReference ref = CompositeBytesReference.of(referenceList.toArray(new BytesReference[0])); assertEquals(length, ref.length()); return ref; } @@ -63,7 +63,7 @@ public class CompositeBytesReferenceTests extends AbstractBytesReferenceTestCase public void testCompositeBuffer() throws IOException { List referenceList = newRefList(randomIntBetween(1, PAGE_SIZE * 2)); - BytesReference ref = new CompositeBytesReference(referenceList.toArray(new BytesReference[0])); + BytesReference ref = CompositeBytesReference.of(referenceList.toArray(new BytesReference[0])); BytesRefIterator iterator = ref.iterator(); BytesRefBuilder builder = new BytesRefBuilder(); @@ -116,7 +116,7 @@ public class CompositeBytesReferenceTests extends AbstractBytesReferenceTestCase } public void testSliceIsNotCompositeIfMatchesSingleSubSlice() { - CompositeBytesReference bytesRef = new CompositeBytesReference( + BytesReference bytesRef = CompositeBytesReference.of( new BytesArray(new byte[12]), new BytesArray(new byte[15]), new BytesArray(new byte[13])); diff --git a/server/src/test/java/org/elasticsearch/common/bytes/ReleasableBytesReferenceTests.java b/server/src/test/java/org/elasticsearch/common/bytes/ReleasableBytesReferenceTests.java index fac05d822ff..fc2e1506853 100644 --- a/server/src/test/java/org/elasticsearch/common/bytes/ReleasableBytesReferenceTests.java +++ b/server/src/test/java/org/elasticsearch/common/bytes/ReleasableBytesReferenceTests.java @@ -77,7 +77,7 @@ public class ReleasableBytesReferenceTests extends AbstractBytesReferenceTestCas referenceList.add(out.bytes()); i += sliceLength; } - BytesReference ref = new CompositeBytesReference(referenceList.toArray(new BytesReference[0])); + BytesReference ref = CompositeBytesReference.of(referenceList.toArray(new BytesReference[0])); assertThat(length, equalTo(ref.length())); delegate = ref; } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportDecompressorTests.java b/server/src/test/java/org/elasticsearch/transport/TransportDecompressorTests.java index 0d0cd3719a1..3c3771b7d50 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportDecompressorTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportDecompressorTests.java @@ -73,7 +73,7 @@ public class TransportDecompressorTests extends ESTestCase { ReleasableBytesReference reference2 = decompressor.pollDecompressedPage(); ReleasableBytesReference reference3 = decompressor.pollDecompressedPage(); assertNull(decompressor.pollDecompressedPage()); - CompositeBytesReference composite = new CompositeBytesReference(reference1, reference2, reference3); + BytesReference composite = CompositeBytesReference.of(reference1, reference2, reference3); assertEquals(4 * 10000, composite.length()); StreamInput streamInput = composite.streamInput(); for (int i = 0; i < 10000; ++i) { @@ -114,7 +114,7 @@ public class TransportDecompressorTests extends ESTestCase { ReleasableBytesReference reference2 = decompressor.pollDecompressedPage(); ReleasableBytesReference reference3 = decompressor.pollDecompressedPage(); assertNull(decompressor.pollDecompressedPage()); - CompositeBytesReference composite = new CompositeBytesReference(reference1, reference2, reference3); + BytesReference composite = CompositeBytesReference.of(reference1, reference2, reference3); assertEquals(4 * 10000, composite.length()); StreamInput streamInput = composite.streamInput(); for (int i = 0; i < 10000; ++i) { diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index ed2c33360ef..b907cb289be 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -231,7 +231,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { final int start = getContentRangeStart(range); final int end = getContentRangeEnd(range); - blob = new CompositeBytesReference(blob, requestBody); + blob = CompositeBytesReference.of(blob, requestBody); blobs.put(blobName, blob); if (limit == null) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index aad19b8d0a5..34cee1c4020 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -292,7 +292,7 @@ public class MockNioTransport extends TcpTransport { references[i] = BytesReference.fromByteBuffer(pages[i].byteBuffer()); } Releasable releasable = () -> IOUtils.closeWhileHandlingException(pages); - try (ReleasableBytesReference reference = new ReleasableBytesReference(new CompositeBytesReference(references), releasable)) { + try (ReleasableBytesReference reference = new ReleasableBytesReference(CompositeBytesReference.of(references), releasable)) { pipeline.handleBytes(channel, reference); return reference.length(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java index 3b65a739e82..0e1dc158d84 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java @@ -48,7 +48,7 @@ public class NormalizerResultHandler { if (bytesRef == null) { bytesRef = new BytesArray(readBuf, 0, bytesRead); } else { - bytesRef = new CompositeBytesReference(bytesRef, new BytesArray(readBuf, 0, bytesRead)); + bytesRef = CompositeBytesReference.of(bytesRef, new BytesArray(readBuf, 0, bytesRead)); } bytesRef = parseResults(xContent, bytesRef); readBuf = new byte[READ_BUF_SIZE]; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java index d6f70670121..2b45aa9bf91 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java @@ -86,8 +86,8 @@ public class IndexingStateProcessor implements StateProcessor { if (findNextZeroByte(newBlock, 0, 0) == -1) { searchFrom += bytesRead; } else { - BytesReference newBytes = new CompositeBytesReference(newBlocks.toArray(new BytesReference[0])); - bytesToDate = (bytesToDate == null) ? newBytes : new CompositeBytesReference(bytesToDate, newBytes); + BytesReference newBytes = CompositeBytesReference.of(newBlocks.toArray(new BytesReference[0])); + bytesToDate = (bytesToDate == null) ? newBytes : CompositeBytesReference.of(bytesToDate, newBytes); bytesToDate = splitAndPersist(bytesToDate, searchFrom); searchFrom = (bytesToDate == null) ? 0 : bytesToDate.length(); newBlocks.clear(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler.java index 01638d7c68f..86c23c439eb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler.java @@ -106,7 +106,7 @@ public class CppLogMessageHandler implements Closeable { if (bytesRef == null) { bytesRef = new BytesArray(readBuf, 0, bytesRead); } else { - bytesRef = new CompositeBytesReference(bytesRef, new BytesArray(readBuf, 0, bytesRead)); + bytesRef = CompositeBytesReference.of(bytesRef, new BytesArray(readBuf, 0, bytesRead)); } bytesRef = parseMessages(xContent, bytesRef); readBuf = new byte[readBufSize];