diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java b/libs/core/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java similarity index 92% rename from server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java rename to libs/core/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java index e0b8aea178c..a30e7490ff4 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java +++ b/libs/core/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java @@ -19,8 +19,6 @@ package org.elasticsearch.common.util.concurrent; -import org.apache.lucene.store.AlreadyClosedException; - import java.util.concurrent.atomic.AtomicInteger; /** @@ -68,7 +66,7 @@ public abstract class AbstractRefCounted implements RefCounted { } protected void alreadyClosed() { - throw new AlreadyClosedException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]"); + throw new IllegalStateException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]"); } /** diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java b/libs/core/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java similarity index 95% rename from server/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java rename to libs/core/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java index b2cc8b99c63..1e7bdc0e78f 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java +++ b/libs/core/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java @@ -44,7 +44,7 @@ public interface RefCounted { * * @see #decRef * @see #tryIncRef() - * @throws org.apache.lucene.store.AlreadyClosedException iff the reference counter can not be incremented. + * @throws IllegalStateException iff the reference counter can not be incremented. */ void incRef(); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java b/libs/core/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java similarity index 94% rename from server/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java rename to libs/core/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java index b2664b134ed..ebcf12482df 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java +++ b/libs/core/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.common.util.concurrent; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; @@ -70,14 +69,14 @@ public class RefCountedTests extends ESTestCase { try { counted.incRef(); fail(" expected exception"); - } catch (AlreadyClosedException ex) { + } catch (IllegalStateException ex) { assertThat(ex.getMessage(), equalTo("test is already closed can't increment refCount current count [0]")); } try { counted.ensureOpen(); fail(" expected exception"); - } catch (AlreadyClosedException ex) { + } catch (IllegalStateException ex) { assertThat(ex.getMessage(), equalTo("closed")); } } @@ -116,7 +115,7 @@ public class RefCountedTests extends ESTestCase { try { counted.ensureOpen(); fail("expected to be closed"); - } catch (AlreadyClosedException ex) { + } catch (IllegalStateException ex) { assertThat(ex.getMessage(), equalTo("closed")); } assertThat(counted.refCount(), is(0)); @@ -140,7 +139,7 @@ public class RefCountedTests extends ESTestCase { public void ensureOpen() { if (closed.get()) { assert this.refCount() == 0; - throw new AlreadyClosedException("closed"); + throw new IllegalStateException("closed"); } } } diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java b/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java index f671b39d4d6..7c718237cd2 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java @@ -19,6 +19,7 @@ package org.elasticsearch.nio; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.nio.utils.ExceptionsHelper; import java.nio.ByteBuffer; @@ -41,6 +42,7 @@ public final class InboundChannelBuffer implements AutoCloseable { private static final int PAGE_MASK = PAGE_SIZE - 1; private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE); private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0]; + private static final Page[] EMPTY_BYTE_PAGE_ARRAY = new Page[0]; private final ArrayDeque pages; @@ -152,6 +154,46 @@ public final class InboundChannelBuffer implements AutoCloseable { return buffers; } + /** + * This method will return an array of {@link Page} representing the bytes from the beginning of + * this buffer up through the index argument that was passed. The pages and buffers will be duplicates of + * the internal components, so any modifications to the markers {@link ByteBuffer#position()}, + * {@link ByteBuffer#limit()}, etc will not modify the this class. Additionally, this will internally + * retain the underlying pages, so the pages returned by this method must be closed. + * + * @param to the index to slice up to + * @return the pages + */ + public Page[] sliceAndRetainPagesTo(long to) { + if (to > capacity) { + throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity + + "], with slice parameters to [" + to + "]"); + } else if (to == 0) { + return EMPTY_BYTE_PAGE_ARRAY; + } + long indexWithOffset = to + offset; + int pageCount = pageIndex(indexWithOffset); + int finalLimit = indexInPage(indexWithOffset); + if (finalLimit != 0) { + pageCount += 1; + } + + Page[] pages = new Page[pageCount]; + Iterator pageIterator = this.pages.iterator(); + Page firstPage = pageIterator.next().duplicate(); + ByteBuffer firstBuffer = firstPage.byteBuffer; + firstBuffer.position(firstBuffer.position() + offset); + pages[0] = firstPage; + for (int i = 1; i < pages.length; i++) { + pages[i] = pageIterator.next().duplicate(); + } + if (finalLimit != 0) { + pages[pages.length - 1].byteBuffer.limit(finalLimit); + } + + return pages; + } + /** * This method will return an array of {@link ByteBuffer} representing the bytes from the index passed * through the end of this buffer. The buffers will be duplicates of the internal buffers, so any @@ -231,16 +273,49 @@ public final class InboundChannelBuffer implements AutoCloseable { public static class Page implements AutoCloseable { private final ByteBuffer byteBuffer; - private final Runnable closeable; + // This is reference counted as some implementations want to retain the byte pages by calling + // sliceAndRetainPagesTo. With reference counting we can increment the reference count, return the + // pages, and safely close them when this channel buffer is done with them. The reference count + // would be 1 at that point, meaning that the pages will remain until the implementation closes + // theirs. + private final RefCountedCloseable refCountedCloseable; public Page(ByteBuffer byteBuffer, Runnable closeable) { + this(byteBuffer, new RefCountedCloseable(closeable)); + } + + private Page(ByteBuffer byteBuffer, RefCountedCloseable refCountedCloseable) { this.byteBuffer = byteBuffer; - this.closeable = closeable; + this.refCountedCloseable = refCountedCloseable; + } + + private Page duplicate() { + refCountedCloseable.incRef(); + return new Page(byteBuffer.duplicate(), refCountedCloseable); + } + + public ByteBuffer getByteBuffer() { + return byteBuffer; } @Override public void close() { - closeable.run(); + refCountedCloseable.decRef(); + } + + private static class RefCountedCloseable extends AbstractRefCounted { + + private final Runnable closeable; + + private RefCountedCloseable(Runnable closeable) { + super("byte array page"); + this.closeable = closeable; + } + + @Override + protected void closeInternal() { + closeable.run(); + } } } } diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java index 199a509cbfa..8dd72e869e8 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java @@ -31,7 +31,8 @@ public class InboundChannelBufferTests extends ESTestCase { private static final int PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES; private final Supplier defaultPageSupplier = () -> - new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {}); + new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> { + }); public void testNewBufferHasSinglePage() { InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier); @@ -167,6 +168,49 @@ public class InboundChannelBufferTests extends ESTestCase { expectThrows(IllegalStateException.class, () -> channelBuffer.ensureCapacity(1)); } + public void testCloseRetainedPages() { + ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + Supplier supplier = () -> { + AtomicBoolean atomicBoolean = new AtomicBoolean(); + queue.add(atomicBoolean); + return new InboundChannelBuffer.Page(ByteBuffer.allocate(PAGE_SIZE), () -> atomicBoolean.set(true)); + }; + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(supplier); + channelBuffer.ensureCapacity(PAGE_SIZE * 4); + + assertEquals(4, queue.size()); + + for (AtomicBoolean closedRef : queue) { + assertFalse(closedRef.get()); + } + + InboundChannelBuffer.Page[] pages = channelBuffer.sliceAndRetainPagesTo(PAGE_SIZE * 2); + + pages[1].close(); + + for (AtomicBoolean closedRef : queue) { + assertFalse(closedRef.get()); + } + + channelBuffer.close(); + + int i = 0; + for (AtomicBoolean closedRef : queue) { + if (i < 1) { + assertFalse(closedRef.get()); + } else { + assertTrue(closedRef.get()); + } + ++i; + } + + pages[0].close(); + + for (AtomicBoolean closedRef : queue) { + assertTrue(closedRef.get()); + } + } + public void testAccessByteBuffers() { InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java index c6655b58bc3..70afcc86ad8 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java @@ -87,8 +87,8 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin { @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java index ea75c62dbbc..ad81719ebcb 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java @@ -78,7 +78,7 @@ public class HttpReadWriteHandler implements ReadWriteHandler { @Override public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException { - int bytesConsumed = adaptor.read(channelBuffer.sliceBuffersTo(channelBuffer.getIndex())); + int bytesConsumed = adaptor.read(channelBuffer.sliceAndRetainPagesTo(channelBuffer.getIndex())); Object message; while ((message = adaptor.pollInboundMessage()) != null) { handleRequest(message); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java index cf8c92bff90..41cb72aa322 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java @@ -29,6 +29,7 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.nio.FlushOperation; +import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.WriteOperation; import java.nio.ByteBuffer; @@ -97,6 +98,13 @@ public class NettyAdaptor implements AutoCloseable { return byteBuf.readerIndex() - initialReaderIndex; } + public int read(InboundChannelBuffer.Page[] pages) { + ByteBuf byteBuf = PagedByteBuf.byteBufFromPages(pages); + int readableBytes = byteBuf.readableBytes(); + nettyChannel.writeInbound(byteBuf); + return readableBytes; + } + public Object pollInboundMessage() { return nettyChannel.readInbound(); } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index 5aac491a6ab..ba51f7c6848 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -32,12 +32,14 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Strings; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.AbstractHttpServerTransport; @@ -63,6 +65,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; @@ -103,6 +106,8 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), (s) -> Setting.parseInt(s, 1, "http.nio.worker_count"), Setting.Property.NodeScope); + private final PageCacheRecycler pageCacheRecycler; + private final boolean tcpNoDelay; private final boolean tcpKeepAlive; private final boolean reuseAddress; @@ -115,9 +120,11 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { private HttpChannelFactory channelFactory; private final NioCorsConfig corsConfig; - public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, - NamedXContentRegistry xContentRegistry, HttpServerTransport.Dispatcher dispatcher) { + public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, + HttpServerTransport.Dispatcher dispatcher) { super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher); + this.pageCacheRecycler = pageCacheRecycler; ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); @@ -329,11 +336,15 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { @Override public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { NioHttpChannel nioChannel = new NioHttpChannel(channel); + java.util.function.Supplier pageSupplier = () -> { + Recycler.V bytes = pageCacheRecycler.bytePage(false); + return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close); + }; HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this, handlingSettings, corsConfig); Consumer exceptionHandler = (e) -> exceptionCaught(nioChannel, e); SocketChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, httpReadWritePipeline, - InboundChannelBuffer.allocatingInstance()); + new InboundChannelBuffer(pageSupplier)); nioChannel.setContext(context); return nioChannel; } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PagedByteBuf.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PagedByteBuf.java new file mode 100644 index 00000000000..40f3aeecfbc --- /dev/null +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PagedByteBuf.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http.nio; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.UnpooledHeapByteBuf; +import org.elasticsearch.nio.InboundChannelBuffer; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class PagedByteBuf extends UnpooledHeapByteBuf { + + private final Runnable releasable; + + private PagedByteBuf(byte[] array, Runnable releasable) { + super(UnpooledByteBufAllocator.DEFAULT, array, array.length); + this.releasable = releasable; + } + + static ByteBuf byteBufFromPages(InboundChannelBuffer.Page[] pages) { + int componentCount = pages.length; + if (componentCount == 0) { + return Unpooled.EMPTY_BUFFER; + } else if (componentCount == 1) { + return byteBufFromPage(pages[0]); + } else { + int maxComponents = Math.max(16, componentCount); + final List components = new ArrayList<>(componentCount); + for (InboundChannelBuffer.Page page : pages) { + components.add(byteBufFromPage(page)); + } + return new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, maxComponents, components); + } + } + + private static ByteBuf byteBufFromPage(InboundChannelBuffer.Page page) { + ByteBuffer buffer = page.getByteBuffer(); + assert buffer.isDirect() == false && buffer.hasArray() : "Must be a heap buffer with an array"; + int offset = buffer.arrayOffset() + buffer.position(); + PagedByteBuf newByteBuf = new PagedByteBuf(buffer.array(), page::close); + return newByteBuf.slice(offset, buffer.remaining()); + } + + + @Override + protected void deallocate() { + try { + super.deallocate(); + } finally { + releasable.run(); + } + } +} diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java index 1cc94f18dd3..1da8e909b2d 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java @@ -67,12 +67,13 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin { @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { return Collections.singletonMap(NIO_HTTP_TRANSPORT_NAME, - () -> new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher)); + () -> new NioHttpServerTransport(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry, + dispatcher)); } } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java index 48a5bf617a4..a0cb74f7cd2 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java @@ -88,12 +88,14 @@ public class NioHttpServerTransportTests extends ESTestCase { private NetworkService networkService; private ThreadPool threadPool; private MockBigArrays bigArrays; + private MockPageCacheRecycler pageRecycler; @Before public void setup() throws Exception { networkService = new NetworkService(Collections.emptyList()); threadPool = new TestThreadPool("test"); - bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + pageRecycler = new MockPageCacheRecycler(Settings.EMPTY); + bigArrays = new MockBigArrays(pageRecycler, new NoneCircuitBreakerService()); } @After @@ -186,7 +188,7 @@ public class NioHttpServerTransportTests extends ESTestCase { throw new AssertionError(); } }; - try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, + try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, threadPool, xContentRegistry(), dispatcher)) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -210,13 +212,13 @@ public class NioHttpServerTransportTests extends ESTestCase { } public void testBindUnavailableAddress() { - try (NioHttpServerTransport transport = new NioHttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, - xContentRegistry(), new NullDispatcher())) { + try (NioHttpServerTransport transport = new NioHttpServerTransport(Settings.EMPTY, networkService, bigArrays, pageRecycler, + threadPool, xContentRegistry(), new NullDispatcher())) { transport.start(); TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); Settings settings = Settings.builder().put("http.port", remoteAddress.getPort()).build(); - try (NioHttpServerTransport otherTransport = new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, - xContentRegistry(), new NullDispatcher())) { + try (NioHttpServerTransport otherTransport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, + threadPool, xContentRegistry(), new NullDispatcher())) { BindHttpException bindHttpException = expectThrows(BindHttpException.class, () -> otherTransport.start()); assertEquals("Failed to bind to [" + remoteAddress.getPort() + "]", bindHttpException.getMessage()); } @@ -259,8 +261,8 @@ public class NioHttpServerTransportTests extends ESTestCase { settings = Settings.builder().put(httpMaxInitialLineLengthSetting.getKey(), maxInitialLineLength + "b").build(); } - try (NioHttpServerTransport transport = - new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) { + try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, + threadPool, xContentRegistry(), dispatcher)) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -279,7 +281,7 @@ public class NioHttpServerTransportTests extends ESTestCase { assertNotNull(causeReference.get()); assertThat(causeReference.get(), instanceOf(TooLongFrameException.class)); } - + // public void testReadTimeout() throws Exception { // final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { // diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PagedByteBufTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PagedByteBufTests.java new file mode 100644 index 00000000000..15bd18ecf69 --- /dev/null +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PagedByteBufTests.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http.nio; + +import io.netty.buffer.ByteBuf; +import org.elasticsearch.nio.InboundChannelBuffer; +import org.elasticsearch.test.ESTestCase; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +public class PagedByteBufTests extends ESTestCase { + + public void testReleasingPage() { + AtomicInteger integer = new AtomicInteger(0); + int pageCount = randomInt(10) + 1; + ArrayList pages = new ArrayList<>(); + for (int i = 0; i < pageCount; ++i) { + pages.add(new InboundChannelBuffer.Page(ByteBuffer.allocate(10), integer::incrementAndGet)); + } + + ByteBuf byteBuf = PagedByteBuf.byteBufFromPages(pages.toArray(new InboundChannelBuffer.Page[0])); + + assertEquals(0, integer.get()); + byteBuf.retain(); + byteBuf.release(); + assertEquals(0, integer.get()); + ByteBuf secondBuf = byteBuf.retainedSlice(); + byteBuf.release(); + assertEquals(0, integer.get()); + secondBuf.release(); + assertEquals(pageCount, integer.get()); + } + + public void testBytesAreUsed() { + byte[] bytes1 = new byte[10]; + byte[] bytes2 = new byte[10]; + + for (int i = 0; i < 10; ++i) { + bytes1[i] = (byte) i; + } + + for (int i = 10; i < 20; ++i) { + bytes2[i - 10] = (byte) i; + } + + InboundChannelBuffer.Page[] pages = new InboundChannelBuffer.Page[2]; + pages[0] = new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes1), () -> {}); + pages[1] = new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes2), () -> {}); + + ByteBuf byteBuf = PagedByteBuf.byteBufFromPages(pages); + assertEquals(20, byteBuf.readableBytes()); + + for (int i = 0; i < 20; ++i) { + assertEquals((byte) i, byteBuf.getByte(i)); + } + + InboundChannelBuffer.Page[] pages2 = new InboundChannelBuffer.Page[2]; + ByteBuffer firstBuffer = ByteBuffer.wrap(bytes1); + firstBuffer.position(2); + ByteBuffer secondBuffer = ByteBuffer.wrap(bytes2); + secondBuffer.limit(8); + pages2[0] = new InboundChannelBuffer.Page(firstBuffer, () -> {}); + pages2[1] = new InboundChannelBuffer.Page(secondBuffer, () -> {}); + + ByteBuf byteBuf2 = PagedByteBuf.byteBufFromPages(pages2); + assertEquals(16, byteBuf2.readableBytes()); + + for (int i = 2; i < 18; ++i) { + assertEquals((byte) i, byteBuf2.getByte(i - 2)); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java index 70d26770a7b..cd8141ffa3c 100644 --- a/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java @@ -116,7 +116,7 @@ public final class NetworkModule { this.transportClient = transportClient; for (NetworkPlugin plugin : plugins) { Map> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays, - circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher); + pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher); if (transportClient == false) { for (Map.Entry> entry : httpTransportFactory.entrySet()) { registerHttpTransport(entry.getKey(), entry.getValue()); diff --git a/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java b/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java index df41036ffea..d33997fc82b 100644 --- a/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java @@ -71,8 +71,8 @@ public interface NetworkPlugin { * See {@link org.elasticsearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation. */ default Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { diff --git a/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java index ba74e373f88..8a4eb8e9177 100644 --- a/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java @@ -159,8 +159,8 @@ public class NetworkModuleTests extends ModuleTestCase { @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher requestDispatcher) { @@ -198,8 +198,8 @@ public class NetworkModuleTests extends ModuleTestCase { @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher requestDispatcher) { @@ -233,8 +233,8 @@ public class NetworkModuleTests extends ModuleTestCase { @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher requestDispatcher) { diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index 9352d978e6e..2cea9bb3646 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -39,7 +39,6 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; -import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; @@ -145,18 +144,8 @@ public class StoreTests extends ESTestCase { store.decRef(); assertThat(store.refCount(), Matchers.equalTo(0)); assertFalse(store.tryIncRef()); - try { - store.incRef(); - fail(" expected exception"); - } catch (AlreadyClosedException ex) { - - } - try { - store.ensureOpen(); - fail(" expected exception"); - } catch (AlreadyClosedException ex) { - - } + expectThrows(IllegalStateException.class, store::incRef); + expectThrows(IllegalStateException.class, store::ensureOpen); } public void testVerifyingIndexOutput() throws IOException { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 44fd61e1693..796cae375e3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -286,14 +286,14 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { Map> transports = new HashMap<>(); filterPlugins(NetworkPlugin.class).stream().forEach(p -> transports.putAll(p.getHttpTransports(settings, threadPool, bigArrays, - circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher))); + pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher))); return transports; } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 664745b1920..c0bd7882c41 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -843,8 +843,8 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) {