From a705e1a9e30abe2b636ab38602bd1afea2b0c422 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 15 Jun 2018 14:01:03 -0600 Subject: [PATCH] Add byte array pooling to nio http transport (#31349) This is related to #28898. This PR implements pooling of bytes arrays when reading from the wire in the http server transport. In order to do this, we must integrate with netty reference counting. That manner in which this PR implements this is making Pages in InboundChannelBuffer reference counted. When we accessing the underlying page to pass to netty, we retain the page. When netty releases its bytebuf, it releases the underlying pages we have passed to it. --- .../util/concurrent/AbstractRefCounted.java | 4 +- .../common/util/concurrent/RefCounted.java | 2 +- .../util/concurrent/RefCountedTests.java | 9 +- .../nio/InboundChannelBuffer.java | 81 ++++++++++++++++- .../nio/InboundChannelBufferTests.java | 46 +++++++++- .../elasticsearch/transport/Netty4Plugin.java | 2 +- .../http/nio/HttpReadWriteHandler.java | 2 +- .../elasticsearch/http/nio/NettyAdaptor.java | 8 ++ .../http/nio/NioHttpServerTransport.java | 17 +++- .../elasticsearch/http/nio/PagedByteBuf.java | 75 +++++++++++++++ .../transport/nio/NioTransportPlugin.java | 5 +- .../http/nio/NioHttpServerTransportTests.java | 20 ++-- .../http/nio/PagedByteBufTests.java | 91 +++++++++++++++++++ .../common/network/NetworkModule.java | 2 +- .../elasticsearch/plugins/NetworkPlugin.java | 2 +- .../common/network/NetworkModuleTests.java | 6 +- .../elasticsearch/index/store/StoreTests.java | 15 +-- .../core/LocalStateCompositeXPackPlugin.java | 4 +- .../xpack/security/Security.java | 2 +- 19 files changed, 343 insertions(+), 50 deletions(-) rename {server => libs/core}/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java (92%) rename {server => libs/core}/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java (95%) rename {server => libs/core}/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java (94%) create mode 100644 plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PagedByteBuf.java create mode 100644 plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PagedByteBufTests.java diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java b/libs/core/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java similarity index 92% rename from server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java rename to libs/core/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java index e0b8aea178c..a30e7490ff4 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java +++ b/libs/core/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java @@ -19,8 +19,6 @@ package org.elasticsearch.common.util.concurrent; -import org.apache.lucene.store.AlreadyClosedException; - import java.util.concurrent.atomic.AtomicInteger; /** @@ -68,7 +66,7 @@ public abstract class AbstractRefCounted implements RefCounted { } protected void alreadyClosed() { - throw new AlreadyClosedException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]"); + throw new IllegalStateException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]"); } /** diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java b/libs/core/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java similarity index 95% rename from server/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java rename to libs/core/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java index b2cc8b99c63..1e7bdc0e78f 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java +++ b/libs/core/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java @@ -44,7 +44,7 @@ public interface RefCounted { * * @see #decRef * @see #tryIncRef() - * @throws org.apache.lucene.store.AlreadyClosedException iff the reference counter can not be incremented. + * @throws IllegalStateException iff the reference counter can not be incremented. */ void incRef(); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java b/libs/core/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java similarity index 94% rename from server/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java rename to libs/core/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java index b2664b134ed..ebcf12482df 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java +++ b/libs/core/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.common.util.concurrent; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; @@ -70,14 +69,14 @@ public class RefCountedTests extends ESTestCase { try { counted.incRef(); fail(" expected exception"); - } catch (AlreadyClosedException ex) { + } catch (IllegalStateException ex) { assertThat(ex.getMessage(), equalTo("test is already closed can't increment refCount current count [0]")); } try { counted.ensureOpen(); fail(" expected exception"); - } catch (AlreadyClosedException ex) { + } catch (IllegalStateException ex) { assertThat(ex.getMessage(), equalTo("closed")); } } @@ -116,7 +115,7 @@ public class RefCountedTests extends ESTestCase { try { counted.ensureOpen(); fail("expected to be closed"); - } catch (AlreadyClosedException ex) { + } catch (IllegalStateException ex) { assertThat(ex.getMessage(), equalTo("closed")); } assertThat(counted.refCount(), is(0)); @@ -140,7 +139,7 @@ public class RefCountedTests extends ESTestCase { public void ensureOpen() { if (closed.get()) { assert this.refCount() == 0; - throw new AlreadyClosedException("closed"); + throw new IllegalStateException("closed"); } } } diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java b/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java index f671b39d4d6..7c718237cd2 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java @@ -19,6 +19,7 @@ package org.elasticsearch.nio; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.nio.utils.ExceptionsHelper; import java.nio.ByteBuffer; @@ -41,6 +42,7 @@ public final class InboundChannelBuffer implements AutoCloseable { private static final int PAGE_MASK = PAGE_SIZE - 1; private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE); private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0]; + private static final Page[] EMPTY_BYTE_PAGE_ARRAY = new Page[0]; private final ArrayDeque pages; @@ -152,6 +154,46 @@ public final class InboundChannelBuffer implements AutoCloseable { return buffers; } + /** + * This method will return an array of {@link Page} representing the bytes from the beginning of + * this buffer up through the index argument that was passed. The pages and buffers will be duplicates of + * the internal components, so any modifications to the markers {@link ByteBuffer#position()}, + * {@link ByteBuffer#limit()}, etc will not modify the this class. Additionally, this will internally + * retain the underlying pages, so the pages returned by this method must be closed. + * + * @param to the index to slice up to + * @return the pages + */ + public Page[] sliceAndRetainPagesTo(long to) { + if (to > capacity) { + throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity + + "], with slice parameters to [" + to + "]"); + } else if (to == 0) { + return EMPTY_BYTE_PAGE_ARRAY; + } + long indexWithOffset = to + offset; + int pageCount = pageIndex(indexWithOffset); + int finalLimit = indexInPage(indexWithOffset); + if (finalLimit != 0) { + pageCount += 1; + } + + Page[] pages = new Page[pageCount]; + Iterator pageIterator = this.pages.iterator(); + Page firstPage = pageIterator.next().duplicate(); + ByteBuffer firstBuffer = firstPage.byteBuffer; + firstBuffer.position(firstBuffer.position() + offset); + pages[0] = firstPage; + for (int i = 1; i < pages.length; i++) { + pages[i] = pageIterator.next().duplicate(); + } + if (finalLimit != 0) { + pages[pages.length - 1].byteBuffer.limit(finalLimit); + } + + return pages; + } + /** * This method will return an array of {@link ByteBuffer} representing the bytes from the index passed * through the end of this buffer. The buffers will be duplicates of the internal buffers, so any @@ -231,16 +273,49 @@ public final class InboundChannelBuffer implements AutoCloseable { public static class Page implements AutoCloseable { private final ByteBuffer byteBuffer; - private final Runnable closeable; + // This is reference counted as some implementations want to retain the byte pages by calling + // sliceAndRetainPagesTo. With reference counting we can increment the reference count, return the + // pages, and safely close them when this channel buffer is done with them. The reference count + // would be 1 at that point, meaning that the pages will remain until the implementation closes + // theirs. + private final RefCountedCloseable refCountedCloseable; public Page(ByteBuffer byteBuffer, Runnable closeable) { + this(byteBuffer, new RefCountedCloseable(closeable)); + } + + private Page(ByteBuffer byteBuffer, RefCountedCloseable refCountedCloseable) { this.byteBuffer = byteBuffer; - this.closeable = closeable; + this.refCountedCloseable = refCountedCloseable; + } + + private Page duplicate() { + refCountedCloseable.incRef(); + return new Page(byteBuffer.duplicate(), refCountedCloseable); + } + + public ByteBuffer getByteBuffer() { + return byteBuffer; } @Override public void close() { - closeable.run(); + refCountedCloseable.decRef(); + } + + private static class RefCountedCloseable extends AbstractRefCounted { + + private final Runnable closeable; + + private RefCountedCloseable(Runnable closeable) { + super("byte array page"); + this.closeable = closeable; + } + + @Override + protected void closeInternal() { + closeable.run(); + } } } } diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java index 199a509cbfa..8dd72e869e8 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java @@ -31,7 +31,8 @@ public class InboundChannelBufferTests extends ESTestCase { private static final int PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES; private final Supplier defaultPageSupplier = () -> - new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {}); + new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> { + }); public void testNewBufferHasSinglePage() { InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier); @@ -167,6 +168,49 @@ public class InboundChannelBufferTests extends ESTestCase { expectThrows(IllegalStateException.class, () -> channelBuffer.ensureCapacity(1)); } + public void testCloseRetainedPages() { + ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + Supplier supplier = () -> { + AtomicBoolean atomicBoolean = new AtomicBoolean(); + queue.add(atomicBoolean); + return new InboundChannelBuffer.Page(ByteBuffer.allocate(PAGE_SIZE), () -> atomicBoolean.set(true)); + }; + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(supplier); + channelBuffer.ensureCapacity(PAGE_SIZE * 4); + + assertEquals(4, queue.size()); + + for (AtomicBoolean closedRef : queue) { + assertFalse(closedRef.get()); + } + + InboundChannelBuffer.Page[] pages = channelBuffer.sliceAndRetainPagesTo(PAGE_SIZE * 2); + + pages[1].close(); + + for (AtomicBoolean closedRef : queue) { + assertFalse(closedRef.get()); + } + + channelBuffer.close(); + + int i = 0; + for (AtomicBoolean closedRef : queue) { + if (i < 1) { + assertFalse(closedRef.get()); + } else { + assertTrue(closedRef.get()); + } + ++i; + } + + pages[0].close(); + + for (AtomicBoolean closedRef : queue) { + assertTrue(closedRef.get()); + } + } + public void testAccessByteBuffers() { InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java index c6655b58bc3..70afcc86ad8 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java @@ -87,8 +87,8 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin { @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java index ea75c62dbbc..ad81719ebcb 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java @@ -78,7 +78,7 @@ public class HttpReadWriteHandler implements ReadWriteHandler { @Override public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException { - int bytesConsumed = adaptor.read(channelBuffer.sliceBuffersTo(channelBuffer.getIndex())); + int bytesConsumed = adaptor.read(channelBuffer.sliceAndRetainPagesTo(channelBuffer.getIndex())); Object message; while ((message = adaptor.pollInboundMessage()) != null) { handleRequest(message); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java index cf8c92bff90..41cb72aa322 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java @@ -29,6 +29,7 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.nio.FlushOperation; +import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.WriteOperation; import java.nio.ByteBuffer; @@ -97,6 +98,13 @@ public class NettyAdaptor implements AutoCloseable { return byteBuf.readerIndex() - initialReaderIndex; } + public int read(InboundChannelBuffer.Page[] pages) { + ByteBuf byteBuf = PagedByteBuf.byteBufFromPages(pages); + int readableBytes = byteBuf.readableBytes(); + nettyChannel.writeInbound(byteBuf); + return readableBytes; + } + public Object pollInboundMessage() { return nettyChannel.readInbound(); } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index 5aac491a6ab..ba51f7c6848 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -32,12 +32,14 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Strings; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.AbstractHttpServerTransport; @@ -63,6 +65,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; @@ -103,6 +106,8 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), (s) -> Setting.parseInt(s, 1, "http.nio.worker_count"), Setting.Property.NodeScope); + private final PageCacheRecycler pageCacheRecycler; + private final boolean tcpNoDelay; private final boolean tcpKeepAlive; private final boolean reuseAddress; @@ -115,9 +120,11 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { private HttpChannelFactory channelFactory; private final NioCorsConfig corsConfig; - public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, - NamedXContentRegistry xContentRegistry, HttpServerTransport.Dispatcher dispatcher) { + public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, + HttpServerTransport.Dispatcher dispatcher) { super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher); + this.pageCacheRecycler = pageCacheRecycler; ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); @@ -329,11 +336,15 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { @Override public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { NioHttpChannel nioChannel = new NioHttpChannel(channel); + java.util.function.Supplier pageSupplier = () -> { + Recycler.V bytes = pageCacheRecycler.bytePage(false); + return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close); + }; HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this, handlingSettings, corsConfig); Consumer exceptionHandler = (e) -> exceptionCaught(nioChannel, e); SocketChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, httpReadWritePipeline, - InboundChannelBuffer.allocatingInstance()); + new InboundChannelBuffer(pageSupplier)); nioChannel.setContext(context); return nioChannel; } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PagedByteBuf.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PagedByteBuf.java new file mode 100644 index 00000000000..40f3aeecfbc --- /dev/null +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PagedByteBuf.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http.nio; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.UnpooledHeapByteBuf; +import org.elasticsearch.nio.InboundChannelBuffer; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class PagedByteBuf extends UnpooledHeapByteBuf { + + private final Runnable releasable; + + private PagedByteBuf(byte[] array, Runnable releasable) { + super(UnpooledByteBufAllocator.DEFAULT, array, array.length); + this.releasable = releasable; + } + + static ByteBuf byteBufFromPages(InboundChannelBuffer.Page[] pages) { + int componentCount = pages.length; + if (componentCount == 0) { + return Unpooled.EMPTY_BUFFER; + } else if (componentCount == 1) { + return byteBufFromPage(pages[0]); + } else { + int maxComponents = Math.max(16, componentCount); + final List components = new ArrayList<>(componentCount); + for (InboundChannelBuffer.Page page : pages) { + components.add(byteBufFromPage(page)); + } + return new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, maxComponents, components); + } + } + + private static ByteBuf byteBufFromPage(InboundChannelBuffer.Page page) { + ByteBuffer buffer = page.getByteBuffer(); + assert buffer.isDirect() == false && buffer.hasArray() : "Must be a heap buffer with an array"; + int offset = buffer.arrayOffset() + buffer.position(); + PagedByteBuf newByteBuf = new PagedByteBuf(buffer.array(), page::close); + return newByteBuf.slice(offset, buffer.remaining()); + } + + + @Override + protected void deallocate() { + try { + super.deallocate(); + } finally { + releasable.run(); + } + } +} diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java index 1cc94f18dd3..1da8e909b2d 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java @@ -67,12 +67,13 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin { @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { return Collections.singletonMap(NIO_HTTP_TRANSPORT_NAME, - () -> new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher)); + () -> new NioHttpServerTransport(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry, + dispatcher)); } } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java index 48a5bf617a4..a0cb74f7cd2 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java @@ -88,12 +88,14 @@ public class NioHttpServerTransportTests extends ESTestCase { private NetworkService networkService; private ThreadPool threadPool; private MockBigArrays bigArrays; + private MockPageCacheRecycler pageRecycler; @Before public void setup() throws Exception { networkService = new NetworkService(Collections.emptyList()); threadPool = new TestThreadPool("test"); - bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + pageRecycler = new MockPageCacheRecycler(Settings.EMPTY); + bigArrays = new MockBigArrays(pageRecycler, new NoneCircuitBreakerService()); } @After @@ -186,7 +188,7 @@ public class NioHttpServerTransportTests extends ESTestCase { throw new AssertionError(); } }; - try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, + try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, threadPool, xContentRegistry(), dispatcher)) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -210,13 +212,13 @@ public class NioHttpServerTransportTests extends ESTestCase { } public void testBindUnavailableAddress() { - try (NioHttpServerTransport transport = new NioHttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, - xContentRegistry(), new NullDispatcher())) { + try (NioHttpServerTransport transport = new NioHttpServerTransport(Settings.EMPTY, networkService, bigArrays, pageRecycler, + threadPool, xContentRegistry(), new NullDispatcher())) { transport.start(); TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); Settings settings = Settings.builder().put("http.port", remoteAddress.getPort()).build(); - try (NioHttpServerTransport otherTransport = new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, - xContentRegistry(), new NullDispatcher())) { + try (NioHttpServerTransport otherTransport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, + threadPool, xContentRegistry(), new NullDispatcher())) { BindHttpException bindHttpException = expectThrows(BindHttpException.class, () -> otherTransport.start()); assertEquals("Failed to bind to [" + remoteAddress.getPort() + "]", bindHttpException.getMessage()); } @@ -259,8 +261,8 @@ public class NioHttpServerTransportTests extends ESTestCase { settings = Settings.builder().put(httpMaxInitialLineLengthSetting.getKey(), maxInitialLineLength + "b").build(); } - try (NioHttpServerTransport transport = - new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) { + try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, + threadPool, xContentRegistry(), dispatcher)) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -279,7 +281,7 @@ public class NioHttpServerTransportTests extends ESTestCase { assertNotNull(causeReference.get()); assertThat(causeReference.get(), instanceOf(TooLongFrameException.class)); } - + // public void testReadTimeout() throws Exception { // final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { // diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PagedByteBufTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PagedByteBufTests.java new file mode 100644 index 00000000000..15bd18ecf69 --- /dev/null +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PagedByteBufTests.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http.nio; + +import io.netty.buffer.ByteBuf; +import org.elasticsearch.nio.InboundChannelBuffer; +import org.elasticsearch.test.ESTestCase; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +public class PagedByteBufTests extends ESTestCase { + + public void testReleasingPage() { + AtomicInteger integer = new AtomicInteger(0); + int pageCount = randomInt(10) + 1; + ArrayList pages = new ArrayList<>(); + for (int i = 0; i < pageCount; ++i) { + pages.add(new InboundChannelBuffer.Page(ByteBuffer.allocate(10), integer::incrementAndGet)); + } + + ByteBuf byteBuf = PagedByteBuf.byteBufFromPages(pages.toArray(new InboundChannelBuffer.Page[0])); + + assertEquals(0, integer.get()); + byteBuf.retain(); + byteBuf.release(); + assertEquals(0, integer.get()); + ByteBuf secondBuf = byteBuf.retainedSlice(); + byteBuf.release(); + assertEquals(0, integer.get()); + secondBuf.release(); + assertEquals(pageCount, integer.get()); + } + + public void testBytesAreUsed() { + byte[] bytes1 = new byte[10]; + byte[] bytes2 = new byte[10]; + + for (int i = 0; i < 10; ++i) { + bytes1[i] = (byte) i; + } + + for (int i = 10; i < 20; ++i) { + bytes2[i - 10] = (byte) i; + } + + InboundChannelBuffer.Page[] pages = new InboundChannelBuffer.Page[2]; + pages[0] = new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes1), () -> {}); + pages[1] = new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes2), () -> {}); + + ByteBuf byteBuf = PagedByteBuf.byteBufFromPages(pages); + assertEquals(20, byteBuf.readableBytes()); + + for (int i = 0; i < 20; ++i) { + assertEquals((byte) i, byteBuf.getByte(i)); + } + + InboundChannelBuffer.Page[] pages2 = new InboundChannelBuffer.Page[2]; + ByteBuffer firstBuffer = ByteBuffer.wrap(bytes1); + firstBuffer.position(2); + ByteBuffer secondBuffer = ByteBuffer.wrap(bytes2); + secondBuffer.limit(8); + pages2[0] = new InboundChannelBuffer.Page(firstBuffer, () -> {}); + pages2[1] = new InboundChannelBuffer.Page(secondBuffer, () -> {}); + + ByteBuf byteBuf2 = PagedByteBuf.byteBufFromPages(pages2); + assertEquals(16, byteBuf2.readableBytes()); + + for (int i = 2; i < 18; ++i) { + assertEquals((byte) i, byteBuf2.getByte(i - 2)); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java index 70d26770a7b..cd8141ffa3c 100644 --- a/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java @@ -116,7 +116,7 @@ public final class NetworkModule { this.transportClient = transportClient; for (NetworkPlugin plugin : plugins) { Map> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays, - circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher); + pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher); if (transportClient == false) { for (Map.Entry> entry : httpTransportFactory.entrySet()) { registerHttpTransport(entry.getKey(), entry.getValue()); diff --git a/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java b/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java index df41036ffea..d33997fc82b 100644 --- a/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java @@ -71,8 +71,8 @@ public interface NetworkPlugin { * See {@link org.elasticsearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation. */ default Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { diff --git a/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java index ba74e373f88..8a4eb8e9177 100644 --- a/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java @@ -159,8 +159,8 @@ public class NetworkModuleTests extends ModuleTestCase { @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher requestDispatcher) { @@ -198,8 +198,8 @@ public class NetworkModuleTests extends ModuleTestCase { @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher requestDispatcher) { @@ -233,8 +233,8 @@ public class NetworkModuleTests extends ModuleTestCase { @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher requestDispatcher) { diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index 9352d978e6e..2cea9bb3646 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -39,7 +39,6 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; -import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; @@ -145,18 +144,8 @@ public class StoreTests extends ESTestCase { store.decRef(); assertThat(store.refCount(), Matchers.equalTo(0)); assertFalse(store.tryIncRef()); - try { - store.incRef(); - fail(" expected exception"); - } catch (AlreadyClosedException ex) { - - } - try { - store.ensureOpen(); - fail(" expected exception"); - } catch (AlreadyClosedException ex) { - - } + expectThrows(IllegalStateException.class, store::incRef); + expectThrows(IllegalStateException.class, store::ensureOpen); } public void testVerifyingIndexOutput() throws IOException { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 44fd61e1693..796cae375e3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -286,14 +286,14 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { Map> transports = new HashMap<>(); filterPlugins(NetworkPlugin.class).stream().forEach(p -> transports.putAll(p.getHttpTransports(settings, threadPool, bigArrays, - circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher))); + pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher))); return transports; } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 664745b1920..c0bd7882c41 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -843,8 +843,8 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) {