Two spots that allow for some optimization: * We are often creating a composite reference of just a single item in the transport layer => special cased via static constructor to make sure we never do that * Also removed the pointless case of an empty composite bytes ref * `ByteBufferReference` is practically always created from a heap buffer these days so there is no point of dealing with all the bounds checks and extra references to sliced buffers from that and we can just use the underlying array directly
This commit is contained in:
parent
f7b8f0b2f4
commit
0a879b95d1
|
@ -60,7 +60,7 @@ public class TcpReadWriteHandler extends BytesWriteHandler {
|
|||
references[i] = BytesReference.fromByteBuffer(pages[i].byteBuffer());
|
||||
}
|
||||
Releasable releasable = () -> IOUtils.closeWhileHandlingException(pages);
|
||||
try (ReleasableBytesReference reference = new ReleasableBytesReference(new CompositeBytesReference(references), releasable)) {
|
||||
try (ReleasableBytesReference reference = new ReleasableBytesReference(CompositeBytesReference.of(references), releasable)) {
|
||||
pipeline.handleBytes(channel, reference);
|
||||
return reference.length();
|
||||
}
|
||||
|
|
|
@ -94,12 +94,12 @@ public interface BytesReference extends Comparable<BytesReference>, ToXContentFr
|
|||
} else if (bufferCount == 1) {
|
||||
return fromByteBuffer(buffers[0]);
|
||||
} else {
|
||||
ByteBufferReference[] references = new ByteBufferReference[bufferCount];
|
||||
BytesReference[] references = new BytesReference[bufferCount];
|
||||
for (int i = 0; i < bufferCount; ++i) {
|
||||
references[i] = new ByteBufferReference(buffers[i]);
|
||||
references[i] = fromByteBuffer(buffers[i]);
|
||||
}
|
||||
|
||||
return new CompositeBytesReference(references);
|
||||
return CompositeBytesReference.of(references);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -107,6 +107,9 @@ public interface BytesReference extends Comparable<BytesReference>, ToXContentFr
|
|||
* Returns BytesReference composed of the provided ByteBuffer.
|
||||
*/
|
||||
static BytesReference fromByteBuffer(ByteBuffer buffer) {
|
||||
if (buffer.hasArray()) {
|
||||
return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
|
||||
}
|
||||
return new ByteBufferReference(buffer);
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,20 @@ public final class CompositeBytesReference extends AbstractBytesReference {
|
|||
private final int length;
|
||||
private final long ramBytesUsed;
|
||||
|
||||
public CompositeBytesReference(BytesReference... references) {
|
||||
public static BytesReference of(BytesReference... references) {
|
||||
switch (references.length) {
|
||||
case 0:
|
||||
return BytesArray.EMPTY;
|
||||
case 1:
|
||||
return references[0];
|
||||
default:
|
||||
return new CompositeBytesReference(references);
|
||||
}
|
||||
}
|
||||
|
||||
private CompositeBytesReference(BytesReference... references) {
|
||||
assert references.length > 1
|
||||
: "Should not build composite reference from less than two references but received [" + references.length + "]";
|
||||
this.references = Objects.requireNonNull(references, "references must not be null");
|
||||
this.offsets = new int[references.length];
|
||||
long ramBytesUsed = 0;
|
||||
|
@ -154,10 +167,10 @@ public final class CompositeBytesReference extends AbstractBytesReference {
|
|||
|
||||
@Override
|
||||
public BytesRefIterator iterator() {
|
||||
if (references.length > 0) {
|
||||
return new BytesRefIterator() {
|
||||
int index = 0;
|
||||
private BytesRefIterator current = references[index++].iterator();
|
||||
|
||||
@Override
|
||||
public BytesRef next() throws IOException {
|
||||
BytesRef next = current.next();
|
||||
|
@ -173,10 +186,6 @@ public final class CompositeBytesReference extends AbstractBytesReference {
|
|||
return next;
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return () -> null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.transport;
|
|||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.breaker.CircuitBreakingException;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.CompositeBytesReference;
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
|
@ -101,7 +102,7 @@ public class InboundAggregator implements Releasable {
|
|||
releasableContent = firstContent;
|
||||
} else {
|
||||
final ReleasableBytesReference[] references = contentAggregation.toArray(new ReleasableBytesReference[0]);
|
||||
final CompositeBytesReference content = new CompositeBytesReference(references);
|
||||
final BytesReference content = CompositeBytesReference.of(references);
|
||||
releasableContent = new ReleasableBytesReference(content, () -> Releasables.close(references));
|
||||
}
|
||||
|
||||
|
|
|
@ -164,7 +164,7 @@ public class InboundPipeline implements Releasable {
|
|||
++index;
|
||||
}
|
||||
final Releasable releasable = () -> Releasables.closeWhileHandlingException(bytesReferences);
|
||||
return new ReleasableBytesReference(new CompositeBytesReference(bytesReferences), releasable);
|
||||
return new ReleasableBytesReference(CompositeBytesReference.of(bytesReferences), releasable);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -96,7 +96,7 @@ abstract class OutboundMessage extends NetworkMessage {
|
|||
if (zeroCopyBuffer.length() == 0) {
|
||||
return message;
|
||||
} else {
|
||||
return new CompositeBytesReference(message, zeroCopyBuffer);
|
||||
return CompositeBytesReference.of(message, zeroCopyBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ public class CompositeBytesReferenceTests extends AbstractBytesReferenceTestCase
|
|||
protected BytesReference newBytesReferenceWithOffsetOfZero(int length) throws IOException {
|
||||
// we know bytes stream output always creates a paged bytes reference, we use it to create randomized content
|
||||
List<BytesReference> referenceList = newRefList(length);
|
||||
BytesReference ref = new CompositeBytesReference(referenceList.toArray(new BytesReference[0]));
|
||||
BytesReference ref = CompositeBytesReference.of(referenceList.toArray(new BytesReference[0]));
|
||||
assertEquals(length, ref.length());
|
||||
return ref;
|
||||
}
|
||||
|
@ -63,7 +63,7 @@ public class CompositeBytesReferenceTests extends AbstractBytesReferenceTestCase
|
|||
|
||||
public void testCompositeBuffer() throws IOException {
|
||||
List<BytesReference> referenceList = newRefList(randomIntBetween(1, PAGE_SIZE * 2));
|
||||
BytesReference ref = new CompositeBytesReference(referenceList.toArray(new BytesReference[0]));
|
||||
BytesReference ref = CompositeBytesReference.of(referenceList.toArray(new BytesReference[0]));
|
||||
BytesRefIterator iterator = ref.iterator();
|
||||
BytesRefBuilder builder = new BytesRefBuilder();
|
||||
|
||||
|
@ -116,7 +116,7 @@ public class CompositeBytesReferenceTests extends AbstractBytesReferenceTestCase
|
|||
}
|
||||
|
||||
public void testSliceIsNotCompositeIfMatchesSingleSubSlice() {
|
||||
CompositeBytesReference bytesRef = new CompositeBytesReference(
|
||||
BytesReference bytesRef = CompositeBytesReference.of(
|
||||
new BytesArray(new byte[12]),
|
||||
new BytesArray(new byte[15]),
|
||||
new BytesArray(new byte[13]));
|
||||
|
|
|
@ -77,7 +77,7 @@ public class ReleasableBytesReferenceTests extends AbstractBytesReferenceTestCas
|
|||
referenceList.add(out.bytes());
|
||||
i += sliceLength;
|
||||
}
|
||||
BytesReference ref = new CompositeBytesReference(referenceList.toArray(new BytesReference[0]));
|
||||
BytesReference ref = CompositeBytesReference.of(referenceList.toArray(new BytesReference[0]));
|
||||
assertThat(length, equalTo(ref.length()));
|
||||
delegate = ref;
|
||||
}
|
||||
|
|
|
@ -73,7 +73,7 @@ public class TransportDecompressorTests extends ESTestCase {
|
|||
ReleasableBytesReference reference2 = decompressor.pollDecompressedPage();
|
||||
ReleasableBytesReference reference3 = decompressor.pollDecompressedPage();
|
||||
assertNull(decompressor.pollDecompressedPage());
|
||||
CompositeBytesReference composite = new CompositeBytesReference(reference1, reference2, reference3);
|
||||
BytesReference composite = CompositeBytesReference.of(reference1, reference2, reference3);
|
||||
assertEquals(4 * 10000, composite.length());
|
||||
StreamInput streamInput = composite.streamInput();
|
||||
for (int i = 0; i < 10000; ++i) {
|
||||
|
@ -114,7 +114,7 @@ public class TransportDecompressorTests extends ESTestCase {
|
|||
ReleasableBytesReference reference2 = decompressor.pollDecompressedPage();
|
||||
ReleasableBytesReference reference3 = decompressor.pollDecompressedPage();
|
||||
assertNull(decompressor.pollDecompressedPage());
|
||||
CompositeBytesReference composite = new CompositeBytesReference(reference1, reference2, reference3);
|
||||
BytesReference composite = CompositeBytesReference.of(reference1, reference2, reference3);
|
||||
assertEquals(4 * 10000, composite.length());
|
||||
StreamInput streamInput = composite.streamInput();
|
||||
for (int i = 0; i < 10000; ++i) {
|
||||
|
|
|
@ -231,7 +231,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
final int start = getContentRangeStart(range);
|
||||
final int end = getContentRangeEnd(range);
|
||||
|
||||
blob = new CompositeBytesReference(blob, requestBody);
|
||||
blob = CompositeBytesReference.of(blob, requestBody);
|
||||
blobs.put(blobName, blob);
|
||||
|
||||
if (limit == null) {
|
||||
|
|
|
@ -292,7 +292,7 @@ public class MockNioTransport extends TcpTransport {
|
|||
references[i] = BytesReference.fromByteBuffer(pages[i].byteBuffer());
|
||||
}
|
||||
Releasable releasable = () -> IOUtils.closeWhileHandlingException(pages);
|
||||
try (ReleasableBytesReference reference = new ReleasableBytesReference(new CompositeBytesReference(references), releasable)) {
|
||||
try (ReleasableBytesReference reference = new ReleasableBytesReference(CompositeBytesReference.of(references), releasable)) {
|
||||
pipeline.handleBytes(channel, reference);
|
||||
return reference.length();
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ public class NormalizerResultHandler {
|
|||
if (bytesRef == null) {
|
||||
bytesRef = new BytesArray(readBuf, 0, bytesRead);
|
||||
} else {
|
||||
bytesRef = new CompositeBytesReference(bytesRef, new BytesArray(readBuf, 0, bytesRead));
|
||||
bytesRef = CompositeBytesReference.of(bytesRef, new BytesArray(readBuf, 0, bytesRead));
|
||||
}
|
||||
bytesRef = parseResults(xContent, bytesRef);
|
||||
readBuf = new byte[READ_BUF_SIZE];
|
||||
|
|
|
@ -86,8 +86,8 @@ public class IndexingStateProcessor implements StateProcessor {
|
|||
if (findNextZeroByte(newBlock, 0, 0) == -1) {
|
||||
searchFrom += bytesRead;
|
||||
} else {
|
||||
BytesReference newBytes = new CompositeBytesReference(newBlocks.toArray(new BytesReference[0]));
|
||||
bytesToDate = (bytesToDate == null) ? newBytes : new CompositeBytesReference(bytesToDate, newBytes);
|
||||
BytesReference newBytes = CompositeBytesReference.of(newBlocks.toArray(new BytesReference[0]));
|
||||
bytesToDate = (bytesToDate == null) ? newBytes : CompositeBytesReference.of(bytesToDate, newBytes);
|
||||
bytesToDate = splitAndPersist(bytesToDate, searchFrom);
|
||||
searchFrom = (bytesToDate == null) ? 0 : bytesToDate.length();
|
||||
newBlocks.clear();
|
||||
|
|
|
@ -106,7 +106,7 @@ public class CppLogMessageHandler implements Closeable {
|
|||
if (bytesRef == null) {
|
||||
bytesRef = new BytesArray(readBuf, 0, bytesRead);
|
||||
} else {
|
||||
bytesRef = new CompositeBytesReference(bytesRef, new BytesArray(readBuf, 0, bytesRead));
|
||||
bytesRef = CompositeBytesReference.of(bytesRef, new BytesArray(readBuf, 0, bytesRead));
|
||||
}
|
||||
bytesRef = parseMessages(xContent, bytesRef);
|
||||
readBuf = new byte[readBufSize];
|
||||
|
|
Loading…
Reference in New Issue