diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java index 48ac4f4f82e..a74dbc0abe0 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java @@ -60,7 +60,7 @@ public class TcpReadWriteHandler extends BytesWriteHandler { references[i] = BytesReference.fromByteBuffer(pages[i].byteBuffer()); } Releasable releasable = () -> IOUtils.closeWhileHandlingException(pages); - try (ReleasableBytesReference reference = new ReleasableBytesReference(new CompositeBytesReference(references), releasable)) { + try (ReleasableBytesReference reference = new ReleasableBytesReference(CompositeBytesReference.of(references), releasable)) { pipeline.handleBytes(channel, reference); return reference.length(); } diff --git a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java index b058d62f337..f830ebf9b8e 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java @@ -94,12 +94,12 @@ public interface BytesReference extends Comparable, ToXContentFr } else if (bufferCount == 1) { return fromByteBuffer(buffers[0]); } else { - ByteBufferReference[] references = new ByteBufferReference[bufferCount]; + BytesReference[] references = new BytesReference[bufferCount]; for (int i = 0; i < bufferCount; ++i) { - references[i] = new ByteBufferReference(buffers[i]); + references[i] = fromByteBuffer(buffers[i]); } - return new CompositeBytesReference(references); + return CompositeBytesReference.of(references); } } @@ -107,6 +107,9 @@ public interface BytesReference extends Comparable, ToXContentFr * Returns BytesReference composed of the provided ByteBuffer. */ static BytesReference fromByteBuffer(ByteBuffer buffer) { + if (buffer.hasArray()) { + return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + } return new ByteBufferReference(buffer); } diff --git a/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java index f82fc4624af..52b5a951b85 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java @@ -42,7 +42,20 @@ public final class CompositeBytesReference extends AbstractBytesReference { private final int length; private final long ramBytesUsed; - public CompositeBytesReference(BytesReference... references) { + public static BytesReference of(BytesReference... references) { + switch (references.length) { + case 0: + return BytesArray.EMPTY; + case 1: + return references[0]; + default: + return new CompositeBytesReference(references); + } + } + + private CompositeBytesReference(BytesReference... references) { + assert references.length > 1 + : "Should not build composite reference from less than two references but received [" + references.length + "]"; this.references = Objects.requireNonNull(references, "references must not be null"); this.offsets = new int[references.length]; long ramBytesUsed = 0; @@ -154,29 +167,25 @@ public final class CompositeBytesReference extends AbstractBytesReference { @Override public BytesRefIterator iterator() { - if (references.length > 0) { - return new BytesRefIterator() { - int index = 0; - private BytesRefIterator current = references[index++].iterator(); - @Override - public BytesRef next() throws IOException { - BytesRef next = current.next(); - if (next == null) { - while (index < references.length) { - current = references[index++].iterator(); - next = current.next(); - if (next != null) { - break; - } + return new BytesRefIterator() { + int index = 0; + private BytesRefIterator current = references[index++].iterator(); + + @Override + public BytesRef next() throws IOException { + BytesRef next = current.next(); + if (next == null) { + while (index < references.length) { + current = references[index++].iterator(); + next = current.next(); + if (next != null) { + break; } } - return next; } - }; - } else { - return () -> null; - } - + return next; + } + }; } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/InboundAggregator.java b/server/src/main/java/org/elasticsearch/transport/InboundAggregator.java index 4516bfe8b16..6cf20a03a60 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundAggregator.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundAggregator.java @@ -22,6 +22,7 @@ package org.elasticsearch.transport; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.lease.Releasable; @@ -101,7 +102,7 @@ public class InboundAggregator implements Releasable { releasableContent = firstContent; } else { final ReleasableBytesReference[] references = contentAggregation.toArray(new ReleasableBytesReference[0]); - final CompositeBytesReference content = new CompositeBytesReference(references); + final BytesReference content = CompositeBytesReference.of(references); releasableContent = new ReleasableBytesReference(content, () -> Releasables.close(references)); } diff --git a/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java b/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java index a9e71c55b4f..533300f8022 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java @@ -164,7 +164,7 @@ public class InboundPipeline implements Releasable { ++index; } final Releasable releasable = () -> Releasables.closeWhileHandlingException(bytesReferences); - return new ReleasableBytesReference(new CompositeBytesReference(bytesReferences), releasable); + return new ReleasableBytesReference(CompositeBytesReference.of(bytesReferences), releasable); } } diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java b/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java index e968a76d3a1..847de9a924d 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java @@ -96,7 +96,7 @@ abstract class OutboundMessage extends NetworkMessage { if (zeroCopyBuffer.length() == 0) { return message; } else { - return new CompositeBytesReference(message, zeroCopyBuffer); + return CompositeBytesReference.of(message, zeroCopyBuffer); } } diff --git a/server/src/test/java/org/elasticsearch/common/bytes/CompositeBytesReferenceTests.java b/server/src/test/java/org/elasticsearch/common/bytes/CompositeBytesReferenceTests.java index f99c9405502..2f283cbaa75 100644 --- a/server/src/test/java/org/elasticsearch/common/bytes/CompositeBytesReferenceTests.java +++ b/server/src/test/java/org/elasticsearch/common/bytes/CompositeBytesReferenceTests.java @@ -40,7 +40,7 @@ public class CompositeBytesReferenceTests extends AbstractBytesReferenceTestCase protected BytesReference newBytesReferenceWithOffsetOfZero(int length) throws IOException { // we know bytes stream output always creates a paged bytes reference, we use it to create randomized content List referenceList = newRefList(length); - BytesReference ref = new CompositeBytesReference(referenceList.toArray(new BytesReference[0])); + BytesReference ref = CompositeBytesReference.of(referenceList.toArray(new BytesReference[0])); assertEquals(length, ref.length()); return ref; } @@ -63,7 +63,7 @@ public class CompositeBytesReferenceTests extends AbstractBytesReferenceTestCase public void testCompositeBuffer() throws IOException { List referenceList = newRefList(randomIntBetween(1, PAGE_SIZE * 2)); - BytesReference ref = new CompositeBytesReference(referenceList.toArray(new BytesReference[0])); + BytesReference ref = CompositeBytesReference.of(referenceList.toArray(new BytesReference[0])); BytesRefIterator iterator = ref.iterator(); BytesRefBuilder builder = new BytesRefBuilder(); @@ -116,7 +116,7 @@ public class CompositeBytesReferenceTests extends AbstractBytesReferenceTestCase } public void testSliceIsNotCompositeIfMatchesSingleSubSlice() { - CompositeBytesReference bytesRef = new CompositeBytesReference( + BytesReference bytesRef = CompositeBytesReference.of( new BytesArray(new byte[12]), new BytesArray(new byte[15]), new BytesArray(new byte[13])); diff --git a/server/src/test/java/org/elasticsearch/common/bytes/ReleasableBytesReferenceTests.java b/server/src/test/java/org/elasticsearch/common/bytes/ReleasableBytesReferenceTests.java index fac05d822ff..fc2e1506853 100644 --- a/server/src/test/java/org/elasticsearch/common/bytes/ReleasableBytesReferenceTests.java +++ b/server/src/test/java/org/elasticsearch/common/bytes/ReleasableBytesReferenceTests.java @@ -77,7 +77,7 @@ public class ReleasableBytesReferenceTests extends AbstractBytesReferenceTestCas referenceList.add(out.bytes()); i += sliceLength; } - BytesReference ref = new CompositeBytesReference(referenceList.toArray(new BytesReference[0])); + BytesReference ref = CompositeBytesReference.of(referenceList.toArray(new BytesReference[0])); assertThat(length, equalTo(ref.length())); delegate = ref; } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportDecompressorTests.java b/server/src/test/java/org/elasticsearch/transport/TransportDecompressorTests.java index 0d0cd3719a1..3c3771b7d50 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportDecompressorTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportDecompressorTests.java @@ -73,7 +73,7 @@ public class TransportDecompressorTests extends ESTestCase { ReleasableBytesReference reference2 = decompressor.pollDecompressedPage(); ReleasableBytesReference reference3 = decompressor.pollDecompressedPage(); assertNull(decompressor.pollDecompressedPage()); - CompositeBytesReference composite = new CompositeBytesReference(reference1, reference2, reference3); + BytesReference composite = CompositeBytesReference.of(reference1, reference2, reference3); assertEquals(4 * 10000, composite.length()); StreamInput streamInput = composite.streamInput(); for (int i = 0; i < 10000; ++i) { @@ -114,7 +114,7 @@ public class TransportDecompressorTests extends ESTestCase { ReleasableBytesReference reference2 = decompressor.pollDecompressedPage(); ReleasableBytesReference reference3 = decompressor.pollDecompressedPage(); assertNull(decompressor.pollDecompressedPage()); - CompositeBytesReference composite = new CompositeBytesReference(reference1, reference2, reference3); + BytesReference composite = CompositeBytesReference.of(reference1, reference2, reference3); assertEquals(4 * 10000, composite.length()); StreamInput streamInput = composite.streamInput(); for (int i = 0; i < 10000; ++i) { diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index ed2c33360ef..b907cb289be 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -231,7 +231,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { final int start = getContentRangeStart(range); final int end = getContentRangeEnd(range); - blob = new CompositeBytesReference(blob, requestBody); + blob = CompositeBytesReference.of(blob, requestBody); blobs.put(blobName, blob); if (limit == null) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index aad19b8d0a5..34cee1c4020 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -292,7 +292,7 @@ public class MockNioTransport extends TcpTransport { references[i] = BytesReference.fromByteBuffer(pages[i].byteBuffer()); } Releasable releasable = () -> IOUtils.closeWhileHandlingException(pages); - try (ReleasableBytesReference reference = new ReleasableBytesReference(new CompositeBytesReference(references), releasable)) { + try (ReleasableBytesReference reference = new ReleasableBytesReference(CompositeBytesReference.of(references), releasable)) { pipeline.handleBytes(channel, reference); return reference.length(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java index 3b65a739e82..0e1dc158d84 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java @@ -48,7 +48,7 @@ public class NormalizerResultHandler { if (bytesRef == null) { bytesRef = new BytesArray(readBuf, 0, bytesRead); } else { - bytesRef = new CompositeBytesReference(bytesRef, new BytesArray(readBuf, 0, bytesRead)); + bytesRef = CompositeBytesReference.of(bytesRef, new BytesArray(readBuf, 0, bytesRead)); } bytesRef = parseResults(xContent, bytesRef); readBuf = new byte[READ_BUF_SIZE]; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java index d6f70670121..2b45aa9bf91 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java @@ -86,8 +86,8 @@ public class IndexingStateProcessor implements StateProcessor { if (findNextZeroByte(newBlock, 0, 0) == -1) { searchFrom += bytesRead; } else { - BytesReference newBytes = new CompositeBytesReference(newBlocks.toArray(new BytesReference[0])); - bytesToDate = (bytesToDate == null) ? newBytes : new CompositeBytesReference(bytesToDate, newBytes); + BytesReference newBytes = CompositeBytesReference.of(newBlocks.toArray(new BytesReference[0])); + bytesToDate = (bytesToDate == null) ? newBytes : CompositeBytesReference.of(bytesToDate, newBytes); bytesToDate = splitAndPersist(bytesToDate, searchFrom); searchFrom = (bytesToDate == null) ? 0 : bytesToDate.length(); newBlocks.clear(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler.java index 01638d7c68f..86c23c439eb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler.java @@ -106,7 +106,7 @@ public class CppLogMessageHandler implements Closeable { if (bytesRef == null) { bytesRef = new BytesArray(readBuf, 0, bytesRead); } else { - bytesRef = new CompositeBytesReference(bytesRef, new BytesArray(readBuf, 0, bytesRead)); + bytesRef = CompositeBytesReference.of(bytesRef, new BytesArray(readBuf, 0, bytesRead)); } bytesRef = parseMessages(xContent, bytesRef); readBuf = new byte[readBufSize];