diff --git a/core/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java b/core/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java new file mode 100644 index 00000000000..fbdcdfd6885 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.bytes; + +import org.apache.lucene.util.BytesRef; + +import java.nio.ByteBuffer; + +/** + * This is a {@link BytesReference} backed by a {@link ByteBuffer}. The byte buffer can either be a heap or + * direct byte buffer. The reference is composed of the space between the {@link ByteBuffer#position} and + * {@link ByteBuffer#limit} at construction time. If the position or limit of the underlying byte buffer is + * changed, those changes will not be reflected in this reference. However, modifying the limit or position + * of the underlying byte buffer is not recommended as those can be used during {@link ByteBuffer#get()} + * bounds checks. Use {@link ByteBuffer#duplicate()} at creation time if you plan on modifying the markers of + * the underlying byte buffer. Any changes to the underlying data in the byte buffer will be reflected. + */ +public class ByteBufferReference extends BytesReference { + + private final ByteBuffer buffer; + private final int offset; + private final int length; + + public ByteBufferReference(ByteBuffer buffer) { + this.buffer = buffer; + this.offset = buffer.position(); + this.length = buffer.remaining(); + } + + @Override + public byte get(int index) { + return buffer.get(index + offset); + } + + @Override + public int length() { + return length; + } + + @Override + public BytesReference slice(int from, int length) { + if (from < 0 || (from + length) > this.length) { + throw new IndexOutOfBoundsException("can't slice a buffer with length [" + this.length + "], with slice parameters from [" + + from + "], length [" + length + "]"); + } + ByteBuffer newByteBuffer = buffer.duplicate(); + newByteBuffer.position(offset + from); + newByteBuffer.limit(offset + from + length); + return new ByteBufferReference(newByteBuffer); + } + + /** + * This will return a bytes ref composed of the bytes. If this is a direct byte buffer, the bytes will + * have to be copied. + * + * @return the bytes ref + */ + @Override + public BytesRef toBytesRef() { + if (buffer.hasArray()) { + return new BytesRef(buffer.array(), buffer.arrayOffset() + offset, length); + } + final byte[] copy = new byte[length]; + buffer.get(copy, offset, length); + return new BytesRef(copy); + } + + @Override + public long ramBytesUsed() { + return buffer.capacity(); + } +} diff --git a/core/src/test/java/org/elasticsearch/common/bytes/ByteBufferReferenceTests.java b/core/src/test/java/org/elasticsearch/common/bytes/ByteBufferReferenceTests.java new file mode 100644 index 00000000000..9560fd40038 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/bytes/ByteBufferReferenceTests.java @@ -0,0 +1,44 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class ByteBufferReferenceTests extends AbstractBytesReferenceTestCase { + + private void initializeBytes(byte[] bytes) { + for (int i = 0 ; i < bytes.length; ++i) { + bytes[i] = (byte) i; + } + } + + @Override + protected BytesReference newBytesReference(int length) throws IOException { + return newBytesReferenceWithOffsetOfZero(length); + } + + @Override + protected BytesReference newBytesReferenceWithOffsetOfZero(int length) throws IOException { + byte[] bytes = new byte[length]; + initializeBytes(bytes); + return new ByteBufferReference(ByteBuffer.wrap(bytes)); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java new file mode 100644 index 00000000000..46cec52bb6c --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -0,0 +1,204 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.nio; + +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.function.Supplier; + +/** + * This is a channel byte buffer composed internally of 16kb pages. When an entire message has been read + * and consumed, the {@link #release(long)} method releases the bytes from the head of the buffer and closes + * the pages internally. If more space is needed at the end of the buffer {@link #ensureCapacity(long)} can + * be called and the buffer will expand using the supplier provided. + */ +public final class InboundChannelBuffer { + + private static final int PAGE_SIZE = 1 << 14; + private static final int PAGE_MASK = PAGE_SIZE - 1; + private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE); + private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0]; + + + private final ArrayDeque pages; + private final Supplier pageSupplier; + + private long capacity = 0; + private long internalIndex = 0; + // The offset is an int as it is the offset of where the bytes begin in the first buffer + private int offset = 0; + + public InboundChannelBuffer() { + this(() -> ByteBuffer.wrap(new byte[PAGE_SIZE])); + } + + private InboundChannelBuffer(Supplier pageSupplier) { + this.pageSupplier = pageSupplier; + this.pages = new ArrayDeque<>(); + this.capacity = PAGE_SIZE * pages.size(); + ensureCapacity(PAGE_SIZE); + } + + public void ensureCapacity(long requiredCapacity) { + if (capacity < requiredCapacity) { + int numPages = numPages(requiredCapacity + offset); + int pagesToAdd = numPages - pages.size(); + for (int i = 0; i < pagesToAdd; i++) { + pages.addLast(pageSupplier.get()); + } + capacity += pagesToAdd * PAGE_SIZE; + } + } + + /** + * This method will release bytes from the head of this buffer. If you release bytes past the current + * index the index is truncated to zero. + * + * @param bytesToRelease number of bytes to drop + */ + public void release(long bytesToRelease) { + if (bytesToRelease > capacity) { + throw new IllegalArgumentException("Releasing more bytes [" + bytesToRelease + "] than buffer capacity [" + capacity + "]."); + } + + int pagesToRelease = pageIndex(offset + bytesToRelease); + for (int i = 0; i < pagesToRelease; i++) { + pages.removeFirst(); + } + capacity -= bytesToRelease; + internalIndex = Math.max(internalIndex - bytesToRelease, 0); + offset = indexInPage(bytesToRelease + offset); + } + + /** + * This method will return an array of {@link ByteBuffer} representing the bytes from the beginning of + * this buffer up through the index argument that was passed. The buffers will be duplicates of the + * internal buffers, so any modifications to the markers {@link ByteBuffer#position()}, + * {@link ByteBuffer#limit()}, etc will not modify the this class. + * + * @param to the index to slice up to + * @return the byte buffers + */ + public ByteBuffer[] sliceBuffersTo(long to) { + if (to > capacity) { + throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity + + "], with slice parameters to [" + to + "]"); + } else if (to == 0) { + return EMPTY_BYTE_BUFFER_ARRAY; + } + long indexWithOffset = to + offset; + int pageCount = pageIndex(indexWithOffset); + int finalLimit = indexInPage(indexWithOffset); + if (finalLimit != 0) { + pageCount += 1; + } + + ByteBuffer[] buffers = new ByteBuffer[pageCount]; + Iterator pageIterator = pages.iterator(); + ByteBuffer firstBuffer = pageIterator.next().duplicate(); + firstBuffer.position(firstBuffer.position() + offset); + buffers[0] = firstBuffer; + for (int i = 1; i < buffers.length; i++) { + buffers[i] = pageIterator.next().duplicate(); + } + if (finalLimit != 0) { + buffers[buffers.length - 1].limit(finalLimit); + } + + return buffers; + } + + /** + * This method will return an array of {@link ByteBuffer} representing the bytes from the index passed + * through the end of this buffer. The buffers will be duplicates of the internal buffers, so any + * modifications to the markers {@link ByteBuffer#position()}, {@link ByteBuffer#limit()}, etc will not + * modify the this class. + * + * @param from the index to slice from + * @return the byte buffers + */ + public ByteBuffer[] sliceBuffersFrom(long from) { + if (from > capacity) { + throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity + + "], with slice parameters from [" + from + "]"); + } else if (from == capacity) { + return EMPTY_BYTE_BUFFER_ARRAY; + } + long indexWithOffset = from + offset; + + int pageIndex = pageIndex(indexWithOffset); + int indexInPage = indexInPage(indexWithOffset); + + ByteBuffer[] buffers = new ByteBuffer[pages.size() - pageIndex]; + Iterator pageIterator = pages.descendingIterator(); + for (int i = buffers.length - 1; i > 0; --i) { + buffers[i] = pageIterator.next().duplicate(); + } + ByteBuffer firstPostIndexBuffer = pageIterator.next().duplicate(); + firstPostIndexBuffer.position(firstPostIndexBuffer.position() + indexInPage); + buffers[0] = firstPostIndexBuffer; + + return buffers; + } + + public void incrementIndex(long delta) { + if (delta < 0) { + throw new IllegalArgumentException("Cannot increment an index with a negative delta [" + delta + "]"); + } + + long newIndex = delta + internalIndex; + if (newIndex > capacity) { + throw new IllegalArgumentException("Cannot increment an index [" + internalIndex + "] with a delta [" + delta + + "] that will result in a new index [" + newIndex + "] that is greater than the capacity [" + capacity + "]."); + } + internalIndex = newIndex; + } + + public long getIndex() { + return internalIndex; + } + + public long getCapacity() { + return capacity; + } + + public long getRemaining() { + long remaining = capacity - internalIndex; + assert remaining >= 0 : "The remaining [" + remaining + "] number of bytes should not be less than zero."; + return remaining; + } + + private int numPages(long capacity) { + final long numPages = (capacity + PAGE_MASK) >>> PAGE_SHIFT; + if (numPages > Integer.MAX_VALUE) { + throw new IllegalArgumentException("pageSize=" + (PAGE_MASK + 1) + " is too small for such as capacity: " + capacity); + } + return (int) numPages; + } + + private int pageIndex(long index) { + return (int) (index >>> PAGE_SHIFT); + } + + private int indexInPage(long index) { + return (int) (index & PAGE_MASK); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NetworkBytesReference.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NetworkBytesReference.java deleted file mode 100644 index cbccd7333d6..00000000000 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NetworkBytesReference.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport.nio; - -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; - -import java.nio.ByteBuffer; -import java.util.Iterator; - -public class NetworkBytesReference extends BytesReference { - - private final BytesArray bytesArray; - private final ByteBuffer writeBuffer; - private final ByteBuffer readBuffer; - - private int writeIndex; - private int readIndex; - - public NetworkBytesReference(BytesArray bytesArray, int writeIndex, int readIndex) { - this.bytesArray = bytesArray; - this.writeIndex = writeIndex; - this.readIndex = readIndex; - this.writeBuffer = ByteBuffer.wrap(bytesArray.array()); - this.readBuffer = ByteBuffer.wrap(bytesArray.array()); - } - - public static NetworkBytesReference wrap(BytesArray bytesArray) { - return wrap(bytesArray, 0, 0); - } - - public static NetworkBytesReference wrap(BytesArray bytesArray, int writeIndex, int readIndex) { - if (readIndex > writeIndex) { - throw new IndexOutOfBoundsException("Read index [" + readIndex + "] was greater than write index [" + writeIndex + "]"); - } - return new NetworkBytesReference(bytesArray, writeIndex, readIndex); - } - - @Override - public byte get(int index) { - return bytesArray.get(index); - } - - @Override - public int length() { - return bytesArray.length(); - } - - @Override - public NetworkBytesReference slice(int from, int length) { - BytesReference ref = bytesArray.slice(from, length); - BytesArray newBytesArray; - if (ref instanceof BytesArray) { - newBytesArray = (BytesArray) ref; - } else { - newBytesArray = new BytesArray(ref.toBytesRef()); - } - - int newReadIndex = Math.min(Math.max(readIndex - from, 0), length); - int newWriteIndex = Math.min(Math.max(writeIndex - from, 0), length); - - return wrap(newBytesArray, newWriteIndex, newReadIndex); - } - - @Override - public BytesRef toBytesRef() { - return bytesArray.toBytesRef(); - } - - @Override - public long ramBytesUsed() { - return bytesArray.ramBytesUsed(); - } - - public int getWriteIndex() { - return writeIndex; - } - - public void incrementWrite(int delta) { - int newWriteIndex = writeIndex + delta; - if (newWriteIndex > bytesArray.length()) { - throw new IndexOutOfBoundsException("New write index [" + newWriteIndex + "] would be greater than length" + - " [" + bytesArray.length() + "]"); - } - - writeIndex = newWriteIndex; - } - - public int getWriteRemaining() { - return bytesArray.length() - writeIndex; - } - - public boolean hasWriteRemaining() { - return getWriteRemaining() > 0; - } - - public int getReadIndex() { - return readIndex; - } - - public void incrementRead(int delta) { - int newReadIndex = readIndex + delta; - if (newReadIndex > writeIndex) { - throw new IndexOutOfBoundsException("New read index [" + newReadIndex + "] would be greater than write" + - " index [" + writeIndex + "]"); - } - readIndex = newReadIndex; - } - - public int getReadRemaining() { - return writeIndex - readIndex; - } - - public boolean hasReadRemaining() { - return getReadRemaining() > 0; - } - - public ByteBuffer getWriteByteBuffer() { - writeBuffer.position(bytesArray.offset() + writeIndex); - writeBuffer.limit(bytesArray.offset() + bytesArray.length()); - return writeBuffer; - } - - public ByteBuffer getReadByteBuffer() { - readBuffer.position(bytesArray.offset() + readIndex); - readBuffer.limit(bytesArray.offset() + writeIndex); - return readBuffer; - } - - public static void vectorizedIncrementReadIndexes(Iterable references, int delta) { - Iterator refs = references.iterator(); - while (delta != 0) { - NetworkBytesReference ref = refs.next(); - int amountToInc = Math.min(ref.getReadRemaining(), delta); - ref.incrementRead(amountToInc); - delta -= amountToInc; - } - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java index f91acc5bbea..0abb6a67650 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java @@ -27,22 +27,35 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.transport.nio.channel.NioSocketChannel; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; public class WriteOperation { private final NioSocketChannel channel; private final ActionListener listener; - private final NetworkBytesReference[] references; + private final ByteBuffer[] buffers; + private final int[] offsets; + private final int length; + private int internalIndex; public WriteOperation(NioSocketChannel channel, BytesReference bytesReference, ActionListener listener) { this.channel = channel; this.listener = listener; - this.references = toArray(bytesReference); + this.buffers = toByteBuffers(bytesReference); + this.offsets = new int[buffers.length]; + int offset = 0; + for (int i = 0; i < buffers.length; i++) { + ByteBuffer buffer = buffers[i]; + offsets[i] = offset; + offset += buffer.remaining(); + } + length = offset; } - public NetworkBytesReference[] getByteReferences() { - return references; + public ByteBuffer[] getByteBuffers() { + return buffers; } public ActionListener getListener() { @@ -54,23 +67,46 @@ public class WriteOperation { } public boolean isFullyFlushed() { - return references[references.length - 1].hasReadRemaining() == false; + return internalIndex == length; } public int flush() throws IOException { - return channel.write(references); + int written = channel.write(getBuffersToWrite()); + internalIndex += written; + return written; } - private static NetworkBytesReference[] toArray(BytesReference reference) { - BytesRefIterator byteRefIterator = reference.iterator(); + private ByteBuffer[] getBuffersToWrite() { + int offsetIndex = getOffsetIndex(internalIndex); + + ByteBuffer[] postIndexBuffers = new ByteBuffer[buffers.length - offsetIndex]; + + ByteBuffer firstBuffer = buffers[0].duplicate(); + firstBuffer.position(internalIndex - offsets[offsetIndex]); + postIndexBuffers[offsetIndex] = firstBuffer; + int j = 1; + for (int i = (offsetIndex + 1); i < buffers.length; ++i) { + postIndexBuffers[j++] = buffers[i].duplicate(); + } + + return postIndexBuffers; + } + + private int getOffsetIndex(int offset) { + final int i = Arrays.binarySearch(offsets, offset); + return i < 0 ? (-(i + 1)) - 1 : i; + } + + private static ByteBuffer[] toByteBuffers(BytesReference bytesReference) { + BytesRefIterator byteRefIterator = bytesReference.iterator(); BytesRef r; try { - // Most network messages are composed of three buffers - ArrayList references = new ArrayList<>(3); + // Most network messages are composed of three buffers. + ArrayList buffers = new ArrayList<>(3); while ((r = byteRefIterator.next()) != null) { - references.add(NetworkBytesReference.wrap(new BytesArray(r), r.length, 0)); + buffers.add(ByteBuffer.wrap(r.bytes, r.offset, r.length)); } - return references.toArray(new NetworkBytesReference[references.size()]); + return buffers.toArray(new ByteBuffer[buffers.size()]); } catch (IOException e) { // this is really an error since we don't do IO in our bytesreferences diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java index b56731aee10..0f6c6715088 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java @@ -20,7 +20,7 @@ package org.elasticsearch.transport.nio.channel; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.transport.nio.NetworkBytesReference; +import org.elasticsearch.transport.nio.InboundChannelBuffer; import org.elasticsearch.transport.nio.SocketSelector; import java.io.IOException; @@ -28,7 +28,6 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SocketChannel; -import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -66,34 +65,22 @@ public class NioSocketChannel extends AbstractNioChannel { return socketSelector; } - public int write(NetworkBytesReference[] references) throws IOException { - int written; - if (references.length == 1) { - written = socketChannel.write(references[0].getReadByteBuffer()); + public int write(ByteBuffer[] buffers) throws IOException { + if (buffers.length == 1) { + return socketChannel.write(buffers[0]); } else { - ByteBuffer[] buffers = new ByteBuffer[references.length]; - for (int i = 0; i < references.length; ++i) { - buffers[i] = references[i].getReadByteBuffer(); - } - written = (int) socketChannel.write(buffers); + return (int) socketChannel.write(buffers); } - if (written <= 0) { - return written; - } - - NetworkBytesReference.vectorizedIncrementReadIndexes(Arrays.asList(references), written); - - return written; } - public int read(NetworkBytesReference reference) throws IOException { - int bytesRead = socketChannel.read(reference.getWriteByteBuffer()); + public int read(InboundChannelBuffer buffer) throws IOException { + int bytesRead = (int) socketChannel.read(buffer.sliceBuffersFrom(buffer.getIndex())); if (bytesRead == -1) { return bytesRead; } - reference.incrementWrite(bytesRead); + buffer.incrementIndex(bytesRead); return bytesRead; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoder.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoder.java index 356af44c5ba..b2ba70fb236 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoder.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoder.java @@ -36,11 +36,11 @@ public class TcpFrameDecoder { private int expectedMessageLength = -1; - public BytesReference decode(BytesReference bytesReference, int currentBufferSize) throws IOException { - if (currentBufferSize >= 6) { + public BytesReference decode(BytesReference bytesReference) throws IOException { + if (bytesReference.length() >= 6) { int messageLength = readHeaderBuffer(bytesReference); int totalLength = messageLength + HEADER_SIZE; - if (totalLength > currentBufferSize) { + if (totalLength > bytesReference.length()) { expectedMessageLength = totalLength; return null; } else if (totalLength == bytesReference.length()) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java index 8eeb32a976c..ae9fe0fdc93 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java @@ -19,25 +19,21 @@ package org.elasticsearch.transport.nio.channel; -import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.ByteBufferReference; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; -import org.elasticsearch.transport.nio.NetworkBytesReference; +import org.elasticsearch.transport.nio.InboundChannelBuffer; import org.elasticsearch.transport.nio.TcpReadHandler; import java.io.IOException; -import java.util.Iterator; -import java.util.LinkedList; +import java.nio.ByteBuffer; public class TcpReadContext implements ReadContext { - private static final int DEFAULT_READ_LENGTH = 1 << 14; - private final TcpReadHandler handler; private final TcpNioSocketChannel channel; private final TcpFrameDecoder frameDecoder; - private final LinkedList references = new LinkedList<>(); - private int rawBytesCount = 0; + private final InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); public TcpReadContext(NioSocketChannel channel, TcpReadHandler handler) { this((TcpNioSocketChannel) channel, handler, new TcpFrameDecoder()); @@ -47,33 +43,27 @@ public class TcpReadContext implements ReadContext { this.handler = handler; this.channel = channel; this.frameDecoder = frameDecoder; - this.references.add(NetworkBytesReference.wrap(new BytesArray(new byte[DEFAULT_READ_LENGTH]))); } @Override public int read() throws IOException { - NetworkBytesReference last = references.peekLast(); - if (last == null || last.hasWriteRemaining() == false) { - this.references.add(NetworkBytesReference.wrap(new BytesArray(new byte[DEFAULT_READ_LENGTH]))); + if (channelBuffer.getRemaining() == 0) { + // Requiring one additional byte will ensure that a new page is allocated. + channelBuffer.ensureCapacity(channelBuffer.getCapacity() + 1); } - int bytesRead = channel.read(references.getLast()); + int bytesRead = channel.read(channelBuffer); if (bytesRead == -1) { return bytesRead; } - rawBytesCount += bytesRead; - BytesReference message; // Frame decoder will throw an exception if the message is improperly formatted, the header is incorrect, // or the message is corrupted - while ((message = frameDecoder.decode(createCompositeBuffer(), rawBytesCount)) != null) { + while ((message = frameDecoder.decode(toBytesReference(channelBuffer))) != null) { int messageLengthWithHeader = message.length(); - NetworkBytesReference.vectorizedIncrementReadIndexes(references, messageLengthWithHeader); - trimDecodedMessages(messageLengthWithHeader); - rawBytesCount -= messageLengthWithHeader; try { BytesReference messageWithoutHeader = message.slice(6, message.length() - 6); @@ -84,32 +74,22 @@ public class TcpReadContext implements ReadContext { } } catch (Exception e) { handler.handleException(channel, e); + } finally { + channelBuffer.release(messageLengthWithHeader); } } return bytesRead; } - private CompositeBytesReference createCompositeBuffer() { - return new CompositeBytesReference(references.toArray(new BytesReference[references.size()])); - } - - private void trimDecodedMessages(int bytesToTrim) { - while (bytesToTrim != 0) { - NetworkBytesReference ref = references.getFirst(); - int readIndex = ref.getReadIndex(); - bytesToTrim -= readIndex; - if (readIndex == ref.length()) { - references.removeFirst(); - } else { - assert bytesToTrim == 0; - if (readIndex != 0) { - references.removeFirst(); - NetworkBytesReference slicedRef = ref.slice(readIndex, ref.length() - readIndex); - references.addFirst(slicedRef); - } - } - + private static BytesReference toBytesReference(InboundChannelBuffer channelBuffer) { + ByteBuffer[] writtenToBuffers = channelBuffer.sliceBuffersTo(channelBuffer.getIndex()); + ByteBufferReference[] references = new ByteBufferReference[writtenToBuffers.length]; + for (int i = 0; i < references.length; ++i) { + references[i] = new ByteBufferReference(writtenToBuffers[i]); } + + return new CompositeBytesReference(references); } + } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/ByteBufferReferenceTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/ByteBufferReferenceTests.java deleted file mode 100644 index 335e3d2f778..00000000000 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/ByteBufferReferenceTests.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport.nio; - -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.test.ESTestCase; - -import java.nio.ByteBuffer; - -public class ByteBufferReferenceTests extends ESTestCase { - - private NetworkBytesReference buffer; - - public void testBasicGetByte() { - byte[] bytes = new byte[10]; - initializeBytes(bytes); - buffer = NetworkBytesReference.wrap(new BytesArray(bytes)); - - assertEquals(10, buffer.length()); - for (int i = 0 ; i < bytes.length; ++i) { - assertEquals(i, buffer.get(i)); - } - } - - public void testBasicGetByteWithOffset() { - byte[] bytes = new byte[10]; - initializeBytes(bytes); - buffer = NetworkBytesReference.wrap(new BytesArray(bytes, 2, 8)); - - assertEquals(8, buffer.length()); - for (int i = 2 ; i < bytes.length; ++i) { - assertEquals(i, buffer.get(i - 2)); - } - } - - public void testBasicGetByteWithOffsetAndLimit() { - byte[] bytes = new byte[10]; - initializeBytes(bytes); - buffer = NetworkBytesReference.wrap(new BytesArray(bytes, 2, 6)); - - assertEquals(6, buffer.length()); - for (int i = 2 ; i < bytes.length - 2; ++i) { - assertEquals(i, buffer.get(i - 2)); - } - } - - public void testGetWriteBufferRespectsWriteIndex() { - byte[] bytes = new byte[10]; - - buffer = NetworkBytesReference.wrap(new BytesArray(bytes, 2, 8)); - - ByteBuffer writeByteBuffer = buffer.getWriteByteBuffer(); - - assertEquals(2, writeByteBuffer.position()); - assertEquals(10, writeByteBuffer.limit()); - - buffer.incrementWrite(2); - - writeByteBuffer = buffer.getWriteByteBuffer(); - assertEquals(4, writeByteBuffer.position()); - assertEquals(10, writeByteBuffer.limit()); - } - - public void testGetReadBufferRespectsReadIndex() { - byte[] bytes = new byte[10]; - - buffer = NetworkBytesReference.wrap(new BytesArray(bytes, 3, 6), 6, 0); - - ByteBuffer readByteBuffer = buffer.getReadByteBuffer(); - - assertEquals(3, readByteBuffer.position()); - assertEquals(9, readByteBuffer.limit()); - - buffer.incrementRead(2); - - readByteBuffer = buffer.getReadByteBuffer(); - assertEquals(5, readByteBuffer.position()); - assertEquals(9, readByteBuffer.limit()); - } - - public void testWriteAndReadRemaining() { - byte[] bytes = new byte[10]; - - buffer = NetworkBytesReference.wrap(new BytesArray(bytes, 2, 8)); - - assertEquals(0, buffer.getReadRemaining()); - assertEquals(8, buffer.getWriteRemaining()); - - buffer.incrementWrite(3); - buffer.incrementRead(2); - - assertEquals(1, buffer.getReadRemaining()); - assertEquals(5, buffer.getWriteRemaining()); - } - - public void testBasicSlice() { - byte[] bytes = new byte[20]; - initializeBytes(bytes); - - buffer = NetworkBytesReference.wrap(new BytesArray(bytes, 2, 18)); - - NetworkBytesReference slice = buffer.slice(4, 14); - - assertEquals(14, slice.length()); - assertEquals(0, slice.getReadIndex()); - assertEquals(0, slice.getWriteIndex()); - - for (int i = 6; i < 20; ++i) { - assertEquals(i, slice.get(i - 6)); - } - } - - public void testSliceWithReadAndWriteIndexes() { - byte[] bytes = new byte[20]; - initializeBytes(bytes); - - buffer = NetworkBytesReference.wrap(new BytesArray(bytes, 2, 18)); - - buffer.incrementWrite(9); - buffer.incrementRead(5); - - NetworkBytesReference slice = buffer.slice(6, 12); - - assertEquals(12, slice.length()); - assertEquals(0, slice.getReadIndex()); - assertEquals(3, slice.getWriteIndex()); - - for (int i = 8; i < 20; ++i) { - assertEquals(i, slice.get(i - 8)); - } - } - - private void initializeBytes(byte[] bytes) { - for (int i = 0 ; i < bytes.length; ++i) { - bytes[i] = (byte) i; - } - } -} diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java new file mode 100644 index 00000000000..7232a938710 --- /dev/null +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java @@ -0,0 +1,152 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.nio; + +import org.elasticsearch.test.ESTestCase; + +import java.nio.ByteBuffer; + +public class InboundChannelBufferTests extends ESTestCase { + + private static final int PAGE_SIZE = 1 << 14; + + public void testNewBufferHasSinglePage() { + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + + assertEquals(PAGE_SIZE, channelBuffer.getCapacity()); + assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); + assertEquals(0, channelBuffer.getIndex()); + } + + public void testExpandCapacity() { + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + + assertEquals(PAGE_SIZE, channelBuffer.getCapacity()); + assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); + + channelBuffer.ensureCapacity(PAGE_SIZE + 1); + + assertEquals(PAGE_SIZE * 2, channelBuffer.getCapacity()); + assertEquals(PAGE_SIZE * 2, channelBuffer.getRemaining()); + } + + public void testExpandCapacityMultiplePages() { + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + + assertEquals(PAGE_SIZE, channelBuffer.getCapacity()); + + int multiple = randomInt(80); + channelBuffer.ensureCapacity(PAGE_SIZE + ((multiple * PAGE_SIZE) - randomInt(500))); + + assertEquals(PAGE_SIZE * (multiple + 1), channelBuffer.getCapacity()); + assertEquals(PAGE_SIZE * (multiple + 1), channelBuffer.getRemaining()); + } + + public void testExpandCapacityRespectsOffset() { + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + + assertEquals(PAGE_SIZE, channelBuffer.getCapacity()); + assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); + + int offset = randomInt(300); + + channelBuffer.release(offset); + + assertEquals(PAGE_SIZE - offset, channelBuffer.getCapacity()); + assertEquals(PAGE_SIZE - offset, channelBuffer.getRemaining()); + + channelBuffer.ensureCapacity(PAGE_SIZE + 1); + + assertEquals(PAGE_SIZE * 2 - offset, channelBuffer.getCapacity()); + assertEquals(PAGE_SIZE * 2 - offset, channelBuffer.getRemaining()); + } + + public void testIncrementIndex() { + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + + assertEquals(0, channelBuffer.getIndex()); + assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); + + channelBuffer.incrementIndex(10); + + assertEquals(10, channelBuffer.getIndex()); + assertEquals(PAGE_SIZE - 10, channelBuffer.getRemaining()); + } + + public void testIncrementIndexWithOffset() { + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + + assertEquals(0, channelBuffer.getIndex()); + assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); + + channelBuffer.release(10); + assertEquals(PAGE_SIZE - 10, channelBuffer.getRemaining()); + + channelBuffer.incrementIndex(10); + + assertEquals(10, channelBuffer.getIndex()); + assertEquals(PAGE_SIZE - 20, channelBuffer.getRemaining()); + + channelBuffer.release(2); + assertEquals(8, channelBuffer.getIndex()); + assertEquals(PAGE_SIZE - 20, channelBuffer.getRemaining()); + } + + public void testAccessByteBuffers() { + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + + int pages = randomInt(50) + 5; + channelBuffer.ensureCapacity(pages * PAGE_SIZE); + + long capacity = channelBuffer.getCapacity(); + + ByteBuffer[] postIndexBuffers = channelBuffer.sliceBuffersFrom(channelBuffer.getIndex()); + int i = 0; + for (ByteBuffer buffer : postIndexBuffers) { + while (buffer.hasRemaining()) { + buffer.put((byte) (i++ % 127)); + } + } + + int indexIncremented = 0; + int bytesReleased = 0; + while (indexIncremented < capacity) { + assertEquals(indexIncremented - bytesReleased, channelBuffer.getIndex()); + + long amountToInc = Math.min(randomInt(2000), channelBuffer.getRemaining()); + ByteBuffer[] postIndexBuffers2 = channelBuffer.sliceBuffersFrom(channelBuffer.getIndex()); + assertEquals((byte) ((channelBuffer.getIndex() + bytesReleased) % 127), postIndexBuffers2[0].get()); + ByteBuffer[] preIndexBuffers = channelBuffer.sliceBuffersTo(channelBuffer.getIndex()); + if (preIndexBuffers.length > 0) { + ByteBuffer preIndexBuffer = preIndexBuffers[preIndexBuffers.length - 1]; + assertEquals((byte) ((channelBuffer.getIndex() + bytesReleased - 1) % 127), preIndexBuffer.get(preIndexBuffer.limit() - 1)); + } + if (randomBoolean()) { + long bytesToRelease = Math.min(randomInt(50), channelBuffer.getIndex()); + channelBuffer.release(bytesToRelease); + bytesReleased += bytesToRelease; + } + channelBuffer.incrementIndex(amountToInc); + indexIncremented += amountToInc; + } + + assertEquals(0, channelBuffer.sliceBuffersFrom(channelBuffer.getIndex()).length); + } +} diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java index 8f270d11e5a..b547273d309 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java @@ -122,8 +122,7 @@ public class SocketEventHandlerTests extends ESTestCase { assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, selectionKey.interestOps()); BytesArray bytesArray = new BytesArray(new byte[1]); - NetworkBytesReference networkBuffer = NetworkBytesReference.wrap(bytesArray); - channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, networkBuffer, mock(ActionListener.class))); + channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, bytesArray, mock(ActionListener.class))); when(rawChannel.write(ByteBuffer.wrap(bytesArray.array()))).thenReturn(1); handler.handleWrite(channel); @@ -138,8 +137,7 @@ public class SocketEventHandlerTests extends ESTestCase { assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, selectionKey.interestOps()); BytesArray bytesArray = new BytesArray(new byte[1]); - NetworkBytesReference networkBuffer = NetworkBytesReference.wrap(bytesArray, 1, 0); - channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, networkBuffer, mock(ActionListener.class))); + channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, bytesArray, mock(ActionListener.class))); when(rawChannel.write(ByteBuffer.wrap(bytesArray.array()))).thenReturn(0); handler.handleWrite(channel); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java index 34a44ee4e4b..1b67d9d099b 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport.nio; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.nio.channel.NioSocketChannel; import org.elasticsearch.transport.nio.channel.WriteContext; @@ -53,7 +54,7 @@ public class SocketSelectorTests extends ESTestCase { private TestSelectionKey selectionKey; private WriteContext writeContext; private ActionListener listener; - private NetworkBytesReference bufferReference = NetworkBytesReference.wrap(new BytesArray(new byte[1])); + private BytesReference bufferReference = new BytesArray(new byte[1]); private Selector rawSelector; @Before @@ -294,8 +295,7 @@ public class SocketSelectorTests extends ESTestCase { socketSelector.preSelect(); - NetworkBytesReference networkBuffer = NetworkBytesReference.wrap(new BytesArray(new byte[1])); - socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), networkBuffer, listener)); + socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), new BytesArray(new byte[1]), listener)); socketSelector.scheduleForRegistration(unRegisteredChannel); TestSelectionKey testSelectionKey = new TestSelectionKey(0); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java index 351ac87eb56..0015d39a373 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.transport.nio.channel.NioSocketChannel; import org.junit.Before; import java.io.IOException; +import java.nio.ByteBuffer; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -48,11 +49,7 @@ public class WriteOperationTests extends ESTestCase { WriteOperation writeOp = new WriteOperation(channel, new BytesArray(new byte[10]), listener); - when(channel.write(any())).thenAnswer(invocationOnMock -> { - NetworkBytesReference[] refs = (NetworkBytesReference[]) invocationOnMock.getArguments()[0]; - refs[0].incrementRead(10); - return 10; - }); + when(channel.write(any(ByteBuffer[].class))).thenReturn(10); writeOp.flush(); @@ -62,15 +59,10 @@ public class WriteOperationTests extends ESTestCase { public void testPartialFlush() throws IOException { WriteOperation writeOp = new WriteOperation(channel, new BytesArray(new byte[10]), listener); - when(channel.write(any())).thenAnswer(invocationOnMock -> { - NetworkBytesReference[] refs = (NetworkBytesReference[]) invocationOnMock.getArguments()[0]; - refs[0].incrementRead(5); - return 5; - }); + when(channel.write(any(ByteBuffer[].class))).thenReturn(5); writeOp.flush(); assertFalse(writeOp.isFullyFlushed()); - assertEquals(5, writeOp.getByteReferences()[0].getReadRemaining()); } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoderTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoderTests.java index 519828592be..450016b1dc3 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoderTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoderTests.java @@ -43,10 +43,8 @@ public class TcpFrameDecoderTests extends ESTestCase { streamOutput.write('S'); streamOutput.write(1); streamOutput.write(1); - streamOutput.write(0); - streamOutput.write(0); - assertNull(frameDecoder.decode(streamOutput.bytes(), 4)); + assertNull(frameDecoder.decode(streamOutput.bytes())); assertEquals(-1, frameDecoder.expectedMessageLength()); } @@ -56,7 +54,7 @@ public class TcpFrameDecoderTests extends ESTestCase { streamOutput.write('S'); streamOutput.writeInt(-1); - BytesReference message = frameDecoder.decode(streamOutput.bytes(), 6); + BytesReference message = frameDecoder.decode(streamOutput.bytes()); assertEquals(-1, frameDecoder.expectedMessageLength()); assertEquals(streamOutput.bytes(), message); @@ -70,7 +68,7 @@ public class TcpFrameDecoderTests extends ESTestCase { streamOutput.write('E'); streamOutput.write('S'); - BytesReference message = frameDecoder.decode(streamOutput.bytes(), 8); + BytesReference message = frameDecoder.decode(streamOutput.bytes()); assertEquals(6, message.length()); assertEquals(streamOutput.bytes().slice(0, 6), message); @@ -84,7 +82,7 @@ public class TcpFrameDecoderTests extends ESTestCase { streamOutput.write('M'); streamOutput.write('A'); - BytesReference message = frameDecoder.decode(streamOutput.bytes(), 8); + BytesReference message = frameDecoder.decode(streamOutput.bytes()); assertEquals(-1, frameDecoder.expectedMessageLength()); assertEquals(streamOutput.bytes(), message); @@ -98,7 +96,7 @@ public class TcpFrameDecoderTests extends ESTestCase { streamOutput.write('M'); streamOutput.write('A'); - BytesReference message = frameDecoder.decode(streamOutput.bytes(), 8); + BytesReference message = frameDecoder.decode(streamOutput.bytes()); assertEquals(9, frameDecoder.expectedMessageLength()); assertNull(message); @@ -113,7 +111,7 @@ public class TcpFrameDecoderTests extends ESTestCase { streamOutput.write('A'); try { - frameDecoder.decode(streamOutput.bytes(), 8); + frameDecoder.decode(streamOutput.bytes()); fail("Expected exception"); } catch (Exception ex) { assertThat(ex, instanceOf(StreamCorruptedException.class)); @@ -134,7 +132,7 @@ public class TcpFrameDecoderTests extends ESTestCase { streamOutput.write(randomByte()); try { - frameDecoder.decode(streamOutput.bytes(), 7); + frameDecoder.decode(streamOutput.bytes()); fail("Expected exception"); } catch (Exception ex) { assertThat(ex, instanceOf(StreamCorruptedException.class)); @@ -158,7 +156,7 @@ public class TcpFrameDecoderTests extends ESTestCase { try { BytesReference bytes = streamOutput.bytes(); - frameDecoder.decode(bytes, bytes.length()); + frameDecoder.decode(bytes); fail("Expected exception"); } catch (Exception ex) { assertThat(ex, instanceOf(TcpTransport.HttpOnTransportException.class)); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java index 7586b5abd91..73583353f73 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java @@ -22,13 +22,13 @@ package org.elasticsearch.transport.nio.channel; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.NetworkBytesReference; +import org.elasticsearch.transport.nio.InboundChannelBuffer; import org.elasticsearch.transport.nio.TcpReadHandler; import org.junit.Before; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.mockito.Matchers.any; @@ -57,13 +57,13 @@ public class TcpReadContextTests extends ESTestCase { byte[] bytes = createMessage(messageLength); byte[] fullMessage = combineMessageAndHeader(bytes); - final AtomicInteger bufferCapacity = new AtomicInteger(); - when(channel.read(any(NetworkBytesReference.class))).thenAnswer(invocationOnMock -> { - NetworkBytesReference reference = (NetworkBytesReference) invocationOnMock.getArguments()[0]; - ByteBuffer buffer = reference.getWriteByteBuffer(); - bufferCapacity.set(reference.getWriteRemaining()); - buffer.put(fullMessage); - reference.incrementWrite(fullMessage.length); + final AtomicLong bufferCapacity = new AtomicLong(); + when(channel.read(any(InboundChannelBuffer.class))).thenAnswer(invocationOnMock -> { + InboundChannelBuffer buffer = (InboundChannelBuffer) invocationOnMock.getArguments()[0]; + ByteBuffer byteBuffer = buffer.sliceBuffersFrom(buffer.getIndex())[0]; + bufferCapacity.set(buffer.getCapacity() - buffer.getIndex()); + byteBuffer.put(fullMessage); + buffer.incrementIndex(fullMessage.length); return fullMessage.length; }); @@ -82,15 +82,15 @@ public class TcpReadContextTests extends ESTestCase { byte[] fullPart1 = combineMessageAndHeader(part1, messageLength + messageLength); byte[] part2 = createMessage(messageLength); - final AtomicInteger bufferCapacity = new AtomicInteger(); + final AtomicLong bufferCapacity = new AtomicLong(); final AtomicReference bytes = new AtomicReference<>(); - when(channel.read(any(NetworkBytesReference.class))).thenAnswer(invocationOnMock -> { - NetworkBytesReference reference = (NetworkBytesReference) invocationOnMock.getArguments()[0]; - ByteBuffer buffer = reference.getWriteByteBuffer(); - bufferCapacity.set(reference.getWriteRemaining()); - buffer.put(bytes.get()); - reference.incrementWrite(bytes.get().length); + when(channel.read(any(InboundChannelBuffer.class))).thenAnswer(invocationOnMock -> { + InboundChannelBuffer buffer = (InboundChannelBuffer) invocationOnMock.getArguments()[0]; + ByteBuffer byteBuffer = buffer.sliceBuffersFrom(buffer.getIndex())[0]; + bufferCapacity.set(buffer.getCapacity() - buffer.getIndex()); + byteBuffer.put(bytes.get()); + buffer.incrementIndex(bytes.get().length); return bytes.get().length; }); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java index 16b53cd71b0..33b84590aaa 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java @@ -80,7 +80,7 @@ public class TcpWriteContextTests extends ESTestCase { assertSame(listener, writeOp.getListener()); assertSame(channel, writeOp.getChannel()); - assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteReferences()[0].getReadByteBuffer()); + assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteBuffers()[0]); } public void testSendMessageFromSameThreadIsQueuedInChannel() throws Exception { @@ -97,7 +97,7 @@ public class TcpWriteContextTests extends ESTestCase { assertSame(listener, writeOp.getListener()); assertSame(channel, writeOp.getChannel()); - assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteReferences()[0].getReadByteBuffer()); + assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteBuffers()[0]); } public void testWriteIsQueuedInChannel() throws Exception {