diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufBytesReference.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufBytesReference.java deleted file mode 100644 index 3f7ff0d6e2a..00000000000 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufBytesReference.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.transport.netty4; - -import io.netty.buffer.ByteBuf; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.bytes.AbstractBytesReference; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.StreamInput; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; - -final class ByteBufBytesReference extends AbstractBytesReference { - - private final ByteBuf buffer; - private final int length; - private final int offset; - - ByteBufBytesReference(ByteBuf buffer, int length) { - this.buffer = buffer; - this.length = length; - this.offset = buffer.readerIndex(); - assert length <= buffer.readableBytes() : "length[" + length +"] > " + buffer.readableBytes(); - } - - @Override - public byte get(int index) { - return buffer.getByte(offset + index); - } - - @Override - public int getInt(int index) { - return buffer.getInt(offset + index); - } - - @Override - public int indexOf(byte marker, int from) { - final int start = offset + from; - return buffer.forEachByte(start, length - start, value -> value != marker); - } - - @Override - public int length() { - return length; - } - - @Override - public BytesReference slice(int from, int length) { - return new ByteBufBytesReference(buffer.slice(offset + from, length), length); - } - - @Override - public StreamInput streamInput() { - return new ByteBufStreamInput(buffer.duplicate(), length); - } - - @Override - public void writeTo(OutputStream os) throws IOException { - buffer.getBytes(offset, os, length); - } - - ByteBuf toByteBuf() { - return buffer.duplicate(); - } - - @Override - public String utf8ToString() { - return buffer.toString(offset, length, StandardCharsets.UTF_8); - } - - @Override - public BytesRef toBytesRef() { - if (buffer.hasArray()) { - return new BytesRef(buffer.array(), buffer.arrayOffset() + offset, length); - } - final byte[] copy = new byte[length]; - buffer.getBytes(offset, copy); - return new BytesRef(copy); - } - - @Override - public long ramBytesUsed() { - return buffer.capacity(); - } - -} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java index ba72f52b32a..0f29bb0c139 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -80,28 +81,24 @@ public class Netty4Utils { if (reference.length() == 0) { return Unpooled.EMPTY_BUFFER; } - if (reference instanceof ByteBufBytesReference) { - return ((ByteBufBytesReference) reference).toByteBuf(); - } else { - final BytesRefIterator iterator = reference.iterator(); - // usually we have one, two, or three components from the header, the message, and a buffer - final List buffers = new ArrayList<>(3); - try { - BytesRef slice; - while ((slice = iterator.next()) != null) { - buffers.add(Unpooled.wrappedBuffer(slice.bytes, slice.offset, slice.length)); - } - - if (buffers.size() == 1) { - return buffers.get(0); - } else { - CompositeByteBuf composite = Unpooled.compositeBuffer(buffers.size()); - composite.addComponents(true, buffers); - return composite; - } - } catch (IOException ex) { - throw new AssertionError("no IO happens here", ex); + final BytesRefIterator iterator = reference.iterator(); + // usually we have one, two, or three components from the header, the message, and a buffer + final List buffers = new ArrayList<>(3); + try { + BytesRef slice; + while ((slice = iterator.next()) != null) { + buffers.add(Unpooled.wrappedBuffer(slice.bytes, slice.offset, slice.length)); } + + if (buffers.size() == 1) { + return buffers.get(0); + } else { + CompositeByteBuf composite = Unpooled.compositeBuffer(buffers.size()); + composite.addComponents(true, buffers); + return composite; + } + } catch (IOException ex) { + throw new AssertionError("no IO happens here", ex); } } @@ -112,7 +109,9 @@ public class Netty4Utils { final int readableBytes = buffer.readableBytes(); if (readableBytes == 0) { return BytesArray.EMPTY; + } else { + final ByteBuffer[] byteBuffers = buffer.nioBuffers(); + return BytesReference.fromByteBuffers(byteBuffers); } - return new ByteBufBytesReference(buffer, buffer.readableBytes()); } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/ByteBufBytesReferenceTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/ByteBufBytesReferenceTests.java deleted file mode 100644 index 4a41aaec952..00000000000 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/ByteBufBytesReferenceTests.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.transport.netty4; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; - -import java.io.IOException; - -public class ByteBufBytesReferenceTests extends AbstractBytesReferenceTestCase { - - @Override - protected BytesReference newBytesReference(int length) throws IOException { - return newBytesReferenceWithOffsetOfZero(length); - } - - @Override - protected BytesReference newBytesReferenceWithOffsetOfZero(int length) throws IOException { - ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(length, bigarrays); - for (int i = 0; i < length; i++) { - out.writeByte((byte) random().nextInt(1 << 8)); - } - assertEquals(out.size(), length); - BytesReference ref = out.bytes(); - assertEquals(ref.length(), length); - BytesRef bytesRef = ref.toBytesRef(); - final ByteBuf buffer = Unpooled.wrappedBuffer(bytesRef.bytes, bytesRef.offset, bytesRef.length); - return Netty4Utils.toBytesReference(buffer); - } - - public void testSliceOnAdvancedBuffer() throws IOException { - BytesReference bytesReference = newBytesReference(randomIntBetween(10, 3 * PAGE_SIZE)); - BytesRef bytesRef = bytesReference.toBytesRef(); - ByteBuf channelBuffer = Unpooled.wrappedBuffer(bytesRef.bytes, bytesRef.offset, - bytesRef.length); - int numBytesToRead = randomIntBetween(1, 5); - for (int i = 0; i < numBytesToRead; i++) { - channelBuffer.readByte(); - } - BytesReference other = Netty4Utils.toBytesReference(channelBuffer); - BytesReference slice = bytesReference.slice(numBytesToRead, bytesReference.length() - numBytesToRead); - assertEquals(other, slice); - assertEquals(other.slice(3, 1), slice.slice(3, 1)); - } - - public void testImmutable() throws IOException { - BytesReference bytesReference = newBytesReference(randomIntBetween(10, 3 * PAGE_SIZE)); - BytesRef bytesRef = BytesRef.deepCopyOf(bytesReference.toBytesRef()); - ByteBuf channelBuffer = Unpooled.wrappedBuffer(bytesRef.bytes, bytesRef.offset, - bytesRef.length); - ByteBufBytesReference byteBufBytesReference = new ByteBufBytesReference(channelBuffer, bytesRef.length); - assertEquals(byteBufBytesReference, bytesReference); - channelBuffer.readInt(); // this advances the index of the channel buffer - assertEquals(byteBufBytesReference, bytesReference); - assertEquals(bytesRef, byteBufBytesReference.toBytesRef()); - - BytesRef unicodeBytes = new BytesRef(randomUnicodeOfCodepointLength(100)); - channelBuffer = Unpooled.wrappedBuffer(unicodeBytes.bytes, unicodeBytes.offset, unicodeBytes.length); - byteBufBytesReference = new ByteBufBytesReference(channelBuffer, unicodeBytes.length); - String utf8ToString = byteBufBytesReference.utf8ToString(); - channelBuffer.readInt(); // this advances the index of the channel buffer - assertEquals(utf8ToString, byteBufBytesReference.utf8ToString()); - } -} diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java index 1fbbbe93e47..2b31cf68f01 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java @@ -69,9 +69,7 @@ public class Netty4UtilsTests extends ESTestCase { BytesReference ref = getRandomizedBytesReference(randomIntBetween(1, 3 * PAGE_SIZE)); ByteBuf buffer = Netty4Utils.toByteBuf(ref); BytesReference bytesReference = Netty4Utils.toBytesReference(buffer); - if (ref instanceof ByteBufBytesReference) { - assertEquals(buffer, ((ByteBufBytesReference) ref).toByteBuf()); - } else if (AbstractBytesReferenceTestCase.getNumPages(ref) > 1) { // we gather the buffers into a channel buffer + if (AbstractBytesReferenceTestCase.getNumPages(ref) > 1) { // we gather the buffers into a channel buffer assertTrue(buffer instanceof CompositeByteBuf); } assertArrayEquals(BytesReference.toBytes(ref), BytesReference.toBytes(bytesReference)); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java index 0408290b039..6e041b354ec 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java @@ -23,15 +23,11 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; -import org.elasticsearch.common.bytes.AbstractBytesReference; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.StreamInput; -import java.io.EOFException; import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -45,266 +41,34 @@ class ByteBufUtils { if (reference.length() == 0) { return Unpooled.EMPTY_BUFFER; } - if (reference instanceof ByteBufBytesReference) { - return ((ByteBufBytesReference) reference).toByteBuf(); - } else { - final BytesRefIterator iterator = reference.iterator(); - // usually we have one, two, or three components from the header, the message, and a buffer - final List buffers = new ArrayList<>(3); - try { - BytesRef slice; - while ((slice = iterator.next()) != null) { - buffers.add(Unpooled.wrappedBuffer(slice.bytes, slice.offset, slice.length)); - } - - if (buffers.size() == 1) { - return buffers.get(0); - } else { - CompositeByteBuf composite = Unpooled.compositeBuffer(buffers.size()); - composite.addComponents(true, buffers); - return composite; - } - } catch (IOException ex) { - throw new AssertionError("no IO happens here", ex); + final BytesRefIterator iterator = reference.iterator(); + // usually we have one, two, or three components from the header, the message, and a buffer + final List buffers = new ArrayList<>(3); + try { + BytesRef slice; + while ((slice = iterator.next()) != null) { + buffers.add(Unpooled.wrappedBuffer(slice.bytes, slice.offset, slice.length)); } + + if (buffers.size() == 1) { + return buffers.get(0); + } else { + CompositeByteBuf composite = Unpooled.compositeBuffer(buffers.size()); + composite.addComponents(true, buffers); + return composite; + } + } catch (IOException ex) { + throw new AssertionError("no IO happens here", ex); } } static BytesReference toBytesReference(final ByteBuf buffer) { final int readableBytes = buffer.readableBytes(); - return readableBytes == 0 ? BytesArray.EMPTY : new ByteBufBytesReference(buffer, readableBytes); - } - - private static class ByteBufBytesReference extends AbstractBytesReference { - - private final ByteBuf buffer; - private final int length; - private final int offset; - - ByteBufBytesReference(ByteBuf buffer, int length) { - this.buffer = buffer; - this.length = length; - this.offset = buffer.readerIndex(); - assert length <= buffer.readableBytes() : "length[" + length +"] > " + buffer.readableBytes(); - } - - @Override - public byte get(int index) { - return buffer.getByte(offset + index); - } - - @Override - public int getInt(int index) { - return buffer.getInt(offset + index); - } - - @Override - public int indexOf(byte marker, int from) { - final int start = offset + from; - return buffer.forEachByte(start, length - start, value -> value != marker); - } - - @Override - public int length() { - return length; - } - - @Override - public BytesReference slice(int from, int length) { - return new ByteBufBytesReference(buffer.slice(offset + from, length), length); - } - - @Override - public StreamInput streamInput() { - return new ByteBufStreamInput(buffer.duplicate(), length); - } - - @Override - public void writeTo(OutputStream os) throws IOException { - buffer.getBytes(offset, os, length); - } - - ByteBuf toByteBuf() { - return buffer.duplicate(); - } - - @Override - public String utf8ToString() { - return buffer.toString(offset, length, StandardCharsets.UTF_8); - } - - @Override - public BytesRef toBytesRef() { - if (buffer.hasArray()) { - return new BytesRef(buffer.array(), buffer.arrayOffset() + offset, length); - } - final byte[] copy = new byte[length]; - buffer.getBytes(offset, copy); - return new BytesRef(copy); - } - - @Override - public long ramBytesUsed() { - return buffer.capacity(); - } - - } - - private static class ByteBufStreamInput extends StreamInput { - - private final ByteBuf buffer; - private final int endIndex; - - ByteBufStreamInput(ByteBuf buffer, int length) { - if (length > buffer.readableBytes()) { - throw new IndexOutOfBoundsException(); - } - this.buffer = buffer; - int startIndex = buffer.readerIndex(); - endIndex = startIndex + length; - buffer.markReaderIndex(); - } - - @Override - public BytesReference readBytesReference(int length) throws IOException { - // NOTE: It is unsafe to share a reference of the internal structure, so we - // use the default implementation which will copy the bytes. It is unsafe because - // a netty ByteBuf might be pooled which requires a manual release to prevent - // memory leaks. - return super.readBytesReference(length); - } - - @Override - public BytesRef readBytesRef(int length) throws IOException { - // NOTE: It is unsafe to share a reference of the internal structure, so we - // use the default implementation which will copy the bytes. It is unsafe because - // a netty ByteBuf might be pooled which requires a manual release to prevent - // memory leaks. - return super.readBytesRef(length); - } - - @Override - public int available() throws IOException { - return endIndex - buffer.readerIndex(); - } - - @Override - protected void ensureCanReadBytes(int length) throws EOFException { - int bytesAvailable = endIndex - buffer.readerIndex(); - if (bytesAvailable < length) { - throw new EOFException("tried to read: " + length + " bytes but only " + bytesAvailable + " remaining"); - } - } - - @Override - public void mark(int readlimit) { - buffer.markReaderIndex(); - } - - @Override - public boolean markSupported() { - return true; - } - - @Override - public int read() throws IOException { - if (available() == 0) { - return -1; - } - return buffer.readByte() & 0xff; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if (len == 0) { - return 0; - } - int available = available(); - if (available == 0) { - return -1; - } - - len = Math.min(available, len); - buffer.readBytes(b, off, len); - return len; - } - - @Override - public short readShort() throws IOException { - try { - return buffer.readShort(); - } catch (IndexOutOfBoundsException ex) { - EOFException eofException = new EOFException(); - eofException.initCause(ex); - throw eofException; - } - } - - @Override - public int readInt() throws IOException { - try { - return buffer.readInt(); - } catch (IndexOutOfBoundsException ex) { - EOFException eofException = new EOFException(); - eofException.initCause(ex); - throw eofException; - } - } - - @Override - public long readLong() throws IOException { - try { - return buffer.readLong(); - } catch (IndexOutOfBoundsException ex) { - EOFException eofException = new EOFException(); - eofException.initCause(ex); - throw eofException; - } - } - - @Override - public void reset() throws IOException { - buffer.resetReaderIndex(); - } - - @Override - public long skip(long n) throws IOException { - if (n > Integer.MAX_VALUE) { - return skipBytes(Integer.MAX_VALUE); - } else { - return skipBytes((int) n); - } - } - - public int skipBytes(int n) throws IOException { - int nBytes = Math.min(available(), n); - buffer.skipBytes(nBytes); - return nBytes; - } - - - @Override - public byte readByte() throws IOException { - try { - return buffer.readByte(); - } catch (IndexOutOfBoundsException ex) { - EOFException eofException = new EOFException(); - eofException.initCause(ex); - throw eofException; - } - } - - @Override - public void readBytes(byte[] b, int offset, int len) throws IOException { - int read = read(b, offset, len); - if (read < len) { - throw new IndexOutOfBoundsException(); - } - } - - @Override - public void close() throws IOException { - // nothing to do here + if (readableBytes == 0) { + return BytesArray.EMPTY; + } else { + final ByteBuffer[] byteBuffers = buffer.nioBuffers(); + return BytesReference.fromByteBuffers(byteBuffers); } } } diff --git a/server/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java b/server/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java index bbf72feb72f..ad381c28402 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java @@ -51,6 +51,25 @@ public class ByteBufferReference extends AbstractBytesReference { return buffer.getInt(index); } + @Override + public int indexOf(byte marker, int from) { + final int remainingBytes = Math.max(length - from, 0); + FutureObjects.checkFromIndexSize(from, remainingBytes, length); + if (buffer.hasArray()) { + int startIndex = from + buffer.arrayOffset(); + int endIndex = startIndex + remainingBytes; + final byte[] array = buffer.array(); + for (int i = startIndex; i < endIndex; i++) { + if (array[i] == marker) { + return (i - buffer.arrayOffset()); + } + } + return -1; + } else { + return super.indexOf(marker, from); + } + } + @Override public int length() { return length; diff --git a/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java index 5082a0935ec..f82fc4624af 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java @@ -71,6 +71,34 @@ public final class CompositeBytesReference extends AbstractBytesReference { return references[i].get(index - offsets[i]); } + @Override + public int indexOf(byte marker, int from) { + final int remainingBytes = Math.max(length - from, 0); + FutureObjects.checkFromIndexSize(from, remainingBytes, length); + + int result = -1; + if (length == 0) { + return result; + } + + final int firstReferenceIndex = getOffsetIndex(from); + for (int i = firstReferenceIndex; i < references.length; ++i) { + final BytesReference reference = references[i]; + final int internalFrom; + if (i == firstReferenceIndex) { + internalFrom = from - offsets[firstReferenceIndex]; + } else { + internalFrom = 0; + } + result = reference.indexOf(marker, internalFrom); + if (result != -1) { + result += offsets[i]; + break; + } + } + return result; + } + @Override public int length() { return length;