diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java index c2c841f889a..609186fc3c3 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java @@ -77,13 +77,11 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin { } @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - PageCacheRecycler pageCacheRecycler, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { + NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool, - networkService, bigArrays, namedWriteableRegistry, circuitBreakerService)); + networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService)); } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 60ef6b21b1f..8e8a8abcc37 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -51,7 +51,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; @@ -104,9 +104,10 @@ public class Netty4Transport extends TcpTransport { private volatile Bootstrap clientBootstrap; private volatile NioEventLoopGroup eventLoopGroup; - public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, - NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - super("netty", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); + public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, + PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, + CircuitBreakerService circuitBreakerService) { + super("netty", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings)); this.workerCount = WORKER_COUNT.get(settings); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java index a711bb690e3..f467a8ad8f3 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java @@ -24,9 +24,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.mocksocket.MockSocket; import org.elasticsearch.test.ESTestCase; @@ -65,8 +64,8 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase { public void startThreadPool() { threadPool = new ThreadPool(settings); NetworkService networkService = new NetworkService(Collections.emptyList()); - BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); - nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, bigArrays, + PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY); + nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); nettyTransport.start(); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java index b93e09b5364..ae0109a83b0 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.plugins.NetworkPlugin; @@ -90,13 +89,13 @@ public class Netty4TransportIT extends ESNetty4IntegTestCase { public static class TestPlugin extends Plugin implements NetworkPlugin { @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.singletonMap("exception-throwing", - () -> new ExceptionThrowingNetty4Transport(settings, threadPool, networkService, bigArrays, + () -> new ExceptionThrowingNetty4Transport(settings, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService)); } } @@ -105,10 +104,10 @@ public class Netty4TransportIT extends ESNetty4IntegTestCase { Settings settings, ThreadPool threadPool, NetworkService networkService, - BigArrays bigArrays, + PageCacheRecycler recycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - super(settings, Version.CURRENT, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService); + super(settings, Version.CURRENT, threadPool, networkService, recycler, namedWriteableRegistry, circuitBreakerService); } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java index 8372a8540b8..87731ef5fb1 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java @@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -36,7 +37,7 @@ import java.io.IOException; public class Netty4UtilsTests extends ESTestCase { private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE; - private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false); + private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), CircuitBreaker.REQUEST); public void testToChannelBufferWithEmptyRef() throws IOException { ByteBuf buffer = Netty4Utils.toByteBuf(getRandomizedBytesReference(0)); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java index 785c4cfb114..ecb720173f7 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java @@ -24,9 +24,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -118,9 +117,9 @@ public class NettyTransportMultiPortTests extends ESTestCase { } private TcpTransport startTransport(Settings settings, ThreadPool threadPool) { - BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY); TcpTransport transport = new Netty4Transport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()), - bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); + recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); transport.start(); assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED)); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index 006fbae6c42..5c3279eaf15 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -27,7 +27,7 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; import org.elasticsearch.test.transport.MockTransportService; @@ -55,7 +55,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase ClusterSettings clusterSettings, boolean doHandshake) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); Transport transport = new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()), - BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) { + PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 61a8fb7c69a..c5f5b414c07 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -66,16 +65,14 @@ public class NioTransport extends TcpTransport { (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), (s) -> Setting.parseInt(s, 1, "transport.nio.worker_count"), Setting.Property.NodeScope); - protected final PageCacheRecycler pageCacheRecycler; private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); private volatile NioGroup nioGroup; private volatile Function clientChannelFactory; - protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, + protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - super("nio", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); - this.pageCacheRecycler = pageCacheRecycler; + super("nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); } @Override diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java index fd57ea20b1c..9c73cbc5a66 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java @@ -56,14 +56,12 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin { } @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - PageCacheRecycler pageCacheRecycler, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { + NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.singletonMap(NIO_TRANSPORT_NAME, - () -> new NioTransport(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler, - namedWriteableRegistry, circuitBreakerService)); + () -> new NioTransport(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, + circuitBreakerService)); } @Override diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java index 0c1bad79ee8..c6452e0be91 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.plugins.NetworkPlugin; @@ -90,22 +89,21 @@ public class NioTransportIT extends NioIntegTestCase { public static class TestPlugin extends Plugin implements NetworkPlugin { @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.singletonMap("exception-throwing", - () -> new ExceptionThrowingNioTransport(settings, threadPool, networkService, bigArrays, pageCacheRecycler, + () -> new ExceptionThrowingNioTransport(settings, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService)); } } - ExceptionThrowingNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, + ExceptionThrowingNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - super(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, - circuitBreakerService); + super(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); } @Override diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index 70acc2d1482..9dd3bc3b957 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; @@ -58,8 +57,8 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase { ClusterSettings clusterSettings, boolean doHandshake) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); NetworkService networkService = new NetworkService(Collections.emptyList()); - Transport transport = new NioTransport(settings, version, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, - new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()) { + Transport transport = new NioTransport(settings, version, threadPool, networkService, new MockPageCacheRecycler(settings), + namedWriteableRegistry, new NoneCircuitBreakerService()) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java index b6c57a39a5a..8450fe8d714 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -28,6 +28,7 @@ import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Module; @@ -182,8 +183,8 @@ public abstract class TransportClient extends AbstractClient { settingsModule.getClusterSettings()); resourcesToClose.add(circuitBreakerService); PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings); - BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService); - resourcesToClose.add(bigArrays); + BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST); + resourcesToClose.add(pageCacheRecycler); modules.add(settingsModule); NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null); @@ -194,6 +195,7 @@ public abstract class TransportClient extends AbstractClient { UUIDs.randomBase64UUID()), null, Collections.emptySet()); modules.add((b -> { b.bind(BigArrays.class).toInstance(bigArrays); + b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler); b.bind(PluginsService.class).toInstance(pluginsService); b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); @@ -374,7 +376,7 @@ public abstract class TransportClient extends AbstractClient { closeables.add(plugin); } closeables.add(() -> ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS)); - closeables.add(injector.getInstance(BigArrays.class)); + closeables.add(injector.getInstance(PageCacheRecycler.class)); IOUtils.closeWhileHandlingException(closeables); } diff --git a/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java index cd8141ffa3c..edc815af417 100644 --- a/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java @@ -122,7 +122,7 @@ public final class NetworkModule { registerHttpTransport(entry.getKey(), entry.getValue()); } } - Map> transportFactory = plugin.getTransports(settings, threadPool, bigArrays, pageCacheRecycler, + Map> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); for (Map.Entry> entry : transportFactory.entrySet()) { registerTransport(entry.getKey(), entry.getValue()); diff --git a/server/src/main/java/org/elasticsearch/common/util/BigArrays.java b/server/src/main/java/org/elasticsearch/common/util/BigArrays.java index 12c511311ea..e6a6e98ad64 100644 --- a/server/src/main/java/org/elasticsearch/common/util/BigArrays.java +++ b/server/src/main/java/org/elasticsearch/common/util/BigArrays.java @@ -33,9 +33,9 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService; import java.util.Arrays; /** Utility class to work with arrays. */ -public class BigArrays implements Releasable { +public class BigArrays { - public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(null, null, false); + public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(null, null, CircuitBreaker.REQUEST); /** Page size in bytes: 16KB */ public static final int PAGE_SIZE_IN_BYTES = 1 << 14; @@ -83,11 +83,6 @@ public class BigArrays implements Releasable { return index == (int) index; } - @Override - public void close() { - recycler.close(); - } - private abstract static class AbstractArrayWrapper extends AbstractArray implements BigArray { static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(ByteArrayWrapper.class); @@ -369,24 +364,26 @@ public class BigArrays implements Releasable { } final PageCacheRecycler recycler; - final CircuitBreakerService breakerService; - final boolean checkBreaker; + private final CircuitBreakerService breakerService; + private final boolean checkBreaker; private final BigArrays circuitBreakingInstance; + private final String breakerName; - public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService) { + public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, String breakerName) { // Checking the breaker is disabled if not specified - this(recycler, breakerService, false); + this(recycler, breakerService, breakerName, false); } - // public for tests - public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, boolean checkBreaker) { + protected BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, String breakerName, + boolean checkBreaker) { this.checkBreaker = checkBreaker; this.recycler = recycler; this.breakerService = breakerService; + this.breakerName = breakerName; if (checkBreaker) { this.circuitBreakingInstance = this; } else { - this.circuitBreakingInstance = new BigArrays(recycler, breakerService, true); + this.circuitBreakingInstance = new BigArrays(recycler, breakerService, breakerName, true); } } @@ -400,7 +397,7 @@ public class BigArrays implements Releasable { */ void adjustBreaker(final long delta, final boolean isDataAlreadyCreated) { if (this.breakerService != null) { - CircuitBreaker breaker = this.breakerService.getBreaker(CircuitBreaker.REQUEST); + CircuitBreaker breaker = this.breakerService.getBreaker(breakerName); if (this.checkBreaker) { // checking breaker means potentially tripping, but it doesn't // have to if the delta is negative diff --git a/server/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java b/server/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java index 66bfdbdc194..b0d2dfca465 100644 --- a/server/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java +++ b/server/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java @@ -59,6 +59,12 @@ public class PageCacheRecycler implements Releasable { private final Recycler longPage; private final Recycler objectPage; + public static final PageCacheRecycler NON_RECYCLING_INSTANCE; + + static { + NON_RECYCLING_INSTANCE = new PageCacheRecycler(Settings.builder().put(LIMIT_HEAP_SETTING.getKey(), "0%").build()); + } + @Override public void close() { Releasables.close(true, bytePage, intPage, longPage, objectPage); diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 2543abce0e6..aa49b10b8b1 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -57,6 +57,7 @@ import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.StopWatch; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Injector; @@ -375,7 +376,7 @@ public class Node implements Closeable { PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings); BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService); - resourcesToClose.add(bigArrays); + resourcesToClose.add(pageCacheRecycler); modules.add(settingsModule); List namedWriteables = Stream.of( NetworkModule.getNamedWriteables().stream(), @@ -516,6 +517,7 @@ public class Node implements Closeable { b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService); b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService); b.bind(BigArrays.class).toInstance(bigArrays); + b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler); b.bind(ScriptService.class).toInstance(scriptModule.getScriptService()); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); b.bind(IngestService.class).toInstance(ingestService); @@ -845,7 +847,7 @@ public class Node implements Closeable { toClose.add(injector.getInstance(NodeEnvironment.class)); - toClose.add(injector.getInstance(BigArrays.class)); + toClose.add(injector.getInstance(PageCacheRecycler.class)); if (logger.isTraceEnabled()) { logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint()); @@ -928,7 +930,7 @@ public class Node implements Closeable { * This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing */ BigArrays createBigArrays(PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) { - return new BigArrays(pageCacheRecycler, circuitBreakerService); + return new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST); } /** diff --git a/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java b/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java index d33997fc82b..56e95200ccc 100644 --- a/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java @@ -58,11 +58,9 @@ public interface NetworkPlugin { * Returns a map of {@link Transport} suppliers. * See {@link org.elasticsearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation. */ - default Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - PageCacheRecycler pageCacheRecycler, + default Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { + NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.emptyMap(); } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 659c264ab37..c45525632de 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -59,6 +59,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.CountDown; @@ -179,6 +180,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final Version version; protected final ThreadPool threadPool; protected final BigArrays bigArrays; + protected final PageCacheRecycler pageCacheRecycler; protected final NetworkService networkService; protected final Set profileSettings; @@ -206,15 +208,16 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final TransportKeepAlive keepAlive; private final String nodeName; - public TcpTransport(String transportName, Settings settings, Version version, ThreadPool threadPool, BigArrays bigArrays, - CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { + public TcpTransport(String transportName, Settings settings, Version version, ThreadPool threadPool, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, + NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { super(settings); this.settings = settings; this.profileSettings = getProfileSettings(settings); this.version = version; this.threadPool = threadPool; - this.bigArrays = bigArrays; + this.bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS); + this.pageCacheRecycler = pageCacheRecycler; this.circuitBreakerService = circuitBreakerService; this.namedWriteableRegistry = namedWriteableRegistry; this.compressResponses = Transport.TRANSPORT_TCP_COMPRESS.get(settings); diff --git a/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java index 7f39b6d3b88..0dbc3efe075 100644 --- a/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java @@ -131,7 +131,7 @@ public class NetworkModuleTests extends ESTestCase { Supplier custom = () -> null; // content doesn't matter we check reference equality NetworkPlugin plugin = new NetworkPlugin() { @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, @@ -187,7 +187,7 @@ public class NetworkModuleTests extends ESTestCase { Supplier def = FakeHttpTransport::new; NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() { @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, @@ -222,7 +222,7 @@ public class NetworkModuleTests extends ESTestCase { Supplier customTransport = () -> null; NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() { @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, diff --git a/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java b/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java index 9b6816e2ee8..83ad34da096 100644 --- a/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java @@ -358,7 +358,7 @@ public class BigArraysTests extends ESTestCase { .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - BigArrays bigArrays = new BigArrays(null, hcbs, false).withCircuitBreaking(); + BigArrays bigArrays = new BigArrays(null, hcbs, CircuitBreaker.REQUEST).withCircuitBreaking(); Method create = BigArrays.class.getMethod("new" + type + "Array", long.class); final int size = scaledRandomIntBetween(10, maxSize / 16); BigArray array = (BigArray) create.invoke(bigArrays, size); @@ -422,7 +422,7 @@ public class BigArraysTests extends ESTestCase { .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - BigArrays bigArrays = new BigArrays(null, hcbs, false); + BigArrays bigArrays = new BigArrays(null, hcbs, CircuitBreaker.REQUEST); return (withBreaking ? bigArrays.withCircuitBreaking() : bigArrays); } diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index e27465078ce..df411f81e3c 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -33,6 +33,7 @@ import org.apache.lucene.util.SetOnce.AlreadySetException; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -132,7 +133,7 @@ public class IndexModuleTests extends ESTestCase { threadPool = new TestThreadPool("test"); circuitBreakerService = new NoneCircuitBreakerService(); PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings); - bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService); + bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST); scriptService = new ScriptService(settings, Collections.emptyMap(), Collections.emptyMap()); clusterService = ClusterServiceUtils.createClusterService(threadPool); nodeEnvironment = new NodeEnvironment(settings, environment); diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 27e4eb7ca39..b52ae8759f2 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -32,8 +32,8 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.TestThreadPool; @@ -191,7 +191,7 @@ public class TcpTransportTests extends ESTestCase { AtomicReference messageCaptor = new AtomicReference<>(); try { TcpTransport transport = new TcpTransport("test", Settings.EMPTY, Version.CURRENT, threadPool, - new BigArrays(new PageCacheRecycler(Settings.EMPTY), null), null, null, null) { + PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), null, null) { @Override protected FakeServerChannel bind(String name, InetSocketAddress address) throws IOException { diff --git a/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java b/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java index 7b1073e954f..dee1ff7edbe 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java @@ -22,6 +22,7 @@ package org.elasticsearch.common.bytes; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.BytesRefIterator; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -37,7 +38,7 @@ import java.util.Arrays; public abstract class AbstractBytesReferenceTestCase extends ESTestCase { protected static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE; - protected final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false); + protected final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), CircuitBreaker.REQUEST); public void testGet() throws IOException { int length = randomIntBetween(1, PAGE_SIZE * 3); diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java index ad7002436c7..586303c23c4 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java @@ -26,6 +26,7 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountables; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.test.ESTestCase; @@ -86,7 +87,7 @@ public class MockBigArrays extends BigArrays { } private MockBigArrays(PageCacheRecycler recycler, CircuitBreakerService breakerService, boolean checkBreaker) { - super(recycler, breakerService, checkBreaker); + super(recycler, breakerService, CircuitBreaker.REQUEST, checkBreaker); this.recycler = recycler; this.breakerService = breakerService; long seed; diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 09f735cb0ab..c0879af2dfa 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -37,7 +37,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.RunOnce; @@ -113,8 +112,7 @@ public final class MockTransportService extends TransportService { settings = Settings.builder().put(TcpTransport.PORT.getKey(), basePort + "-" + (basePort + 100)).put(settings).build(); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); return new MockNioTransport(settings, version, threadPool, new NetworkService(Collections.emptyList()), - BigArrays.NON_RECYCLING_INSTANCE, new MockPageCacheRecycler(settings), namedWriteableRegistry, - new NoneCircuitBreakerService()); + new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()); } public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool, diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index b5cdbeae2c9..6b328f18625 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.internal.io.IOUtils; @@ -98,15 +99,14 @@ public class MockTcpTransport extends TcpTransport { public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { - this(settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService, - Version.CURRENT); + this(settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService, Version.CURRENT); } public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, Version mockVersion) { - super("mock-tcp-transport", settings, mockVersion, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, - networkService); + super("mock-tcp-transport", settings, mockVersion, threadPool, PageCacheRecycler.NON_RECYCLING_INSTANCE, circuitBreakerService, + namedWriteableRegistry, networkService); // we have our own crazy cached threadpool this one is not bounded at all... // using the ES thread factory here is crucial for tests otherwise disruption tests won't block that thread executor = Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, Transports.TEST_MOCK_TRANSPORT_THREAD_PREFIX)); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index 780913ae9f6..cbadcfa1f38 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.nio.BytesChannelContext; @@ -69,16 +68,14 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF public class MockNioTransport extends TcpTransport { private static final Logger logger = LogManager.getLogger(MockNioTransport.class); - private final PageCacheRecycler pageCacheRecycler; private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); private volatile NioGroup nioGroup; private volatile MockTcpChannelFactory clientChannelFactory; - public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, + public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - super("mock-nio", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); - this.pageCacheRecycler = pageCacheRecycler; + super("mock-nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransportPlugin.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransportPlugin.java index ceabe72ee44..f136185fac3 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransportPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransportPlugin.java @@ -22,7 +22,6 @@ import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.plugins.NetworkPlugin; @@ -39,13 +38,11 @@ public class MockNioTransportPlugin extends Plugin implements NetworkPlugin { public static final String MOCK_NIO_TRANSPORT_NAME = "mock-nio"; @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - PageCacheRecycler pageCacheRecycler, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { + NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.singletonMap(MOCK_NIO_TRANSPORT_NAME, - () -> new MockNioTransport(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler, + () -> new MockNioTransport(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService)); } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java index 5facd0211f6..84b921cf88f 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; @@ -57,8 +56,8 @@ public class SimpleMockNioTransportTests extends AbstractSimpleTransportTestCase ClusterSettings clusterSettings, boolean doHandshake) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); NetworkService networkService = new NetworkService(Collections.emptyList()); - Transport transport = new MockNioTransport(settings, version, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, - new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()) { + Transport transport = new MockNioTransport(settings, version, threadPool, networkService, new MockPageCacheRecycler(settings), + namedWriteableRegistry, new NoneCircuitBreakerService()) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 18474b08e49..10c30a2f331 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -461,7 +460,6 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl public Map> getTransports( final Settings settings, final ThreadPool threadPool, - final BigArrays bigArrays, final PageCacheRecycler pageCacheRecycler, final CircuitBreakerService circuitBreakerService, final NamedWriteableRegistry namedWriteableRegistry, @@ -477,7 +475,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl throw new RuntimeException(e); } return Collections.singletonMap(SecurityField.NAME4, () -> new SecurityNetty4Transport(settings, Version.CURRENT, threadPool, - networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, sslService)); + networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService)); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java index f30fe43e2b5..3712c27c435 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java @@ -20,7 +20,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; @@ -61,11 +61,11 @@ public class SecurityNetty4Transport extends Netty4Transport { final Version version, final ThreadPool threadPool, final NetworkService networkService, - final BigArrays bigArrays, + final PageCacheRecycler pageCacheRecycler, final NamedWriteableRegistry namedWriteableRegistry, final CircuitBreakerService circuitBreakerService, final SSLService sslService) { - super(settings, version, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService); + super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); this.sslService = sslService; this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings); if (sslEnabled) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index d0686bd03d9..2b19eea5b56 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -276,15 +276,13 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip } @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - PageCacheRecycler pageCacheRecycler, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { + NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { Map> transports = new HashMap<>(); - transports.putAll(super.getTransports(settings, threadPool, bigArrays, pageCacheRecycler, - circuitBreakerService, namedWriteableRegistry, networkService)); - filterPlugins(NetworkPlugin.class).stream().forEach(p -> transports.putAll(p.getTransports(settings, threadPool, bigArrays, + transports.putAll(super.getTransports(settings, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, + networkService)); + filterPlugins(NetworkPlugin.class).stream().forEach(p -> transports.putAll(p.getTransports(settings, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService))); return transports; diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 7ebd50d1944..6b41ab02071 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -889,20 +889,18 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw } @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - PageCacheRecycler pageCacheRecycler, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { + NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { if (transportClientMode || enabled == false) { // don't register anything if we are not enabled, or in transport client mode return Collections.emptyMap(); } Map> transports = new HashMap<>(); transports.put(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, Version.CURRENT, threadPool, - networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); + networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, Version.CURRENT, threadPool, - networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); + networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); return Collections.unmodifiableMap(transports); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java index d74aa65e94b..8cb1085d3aa 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java @@ -12,7 +12,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport; @@ -29,12 +29,12 @@ public class SecurityNetty4ServerTransport extends SecurityNetty4Transport { final Version version, final ThreadPool threadPool, final NetworkService networkService, - final BigArrays bigArrays, + final PageCacheRecycler pageCacheRecycler, final NamedWriteableRegistry namedWriteableRegistry, final CircuitBreakerService circuitBreakerService, @Nullable final IPFilter authenticator, final SSLService sslService) { - super(settings, version, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, sslService); + super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService); this.authenticator = authenticator; } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java index b7392a32c4e..0642f635ed0 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.nio.BytesChannelContext; @@ -75,10 +74,10 @@ public class SecurityNioTransport extends NioTransport { private final boolean sslEnabled; public SecurityNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, - BigArrays bigArrays, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, + PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService, @Nullable final IPFilter authenticator, SSLService sslService) { - super(settings, version, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); + super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); this.authenticator = authenticator; this.sslService = sslService; this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransportTests.java index dc6bffe5c72..32f98280118 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransportTests.java @@ -13,7 +13,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -26,7 +26,6 @@ import org.elasticsearch.xpack.core.ssl.SSLService; import org.junit.Before; import javax.net.ssl.SSLEngine; - import java.nio.file.Path; import java.util.Collections; import java.util.Locale; @@ -72,7 +71,7 @@ public class SecurityNetty4ServerTransportTests extends ESTestCase { Version.CURRENT, mock(ThreadPool.class), new NetworkService(Collections.emptyList()), - mock(BigArrays.class), + mock(PageCacheRecycler.class), mock(NamedWriteableRegistry.class), mock(CircuitBreakerService.class), null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java index bf31240148a..d5a5cedc19a 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java @@ -12,7 +12,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; @@ -34,7 +34,7 @@ public class SimpleSecurityNetty4ServerTransportTests extends AbstractSimpleSecu .put(settings) .put("xpack.security.transport.ssl.enabled", true).build(); Transport transport = new SecurityNetty4ServerTransport(settings1, version, threadPool, - networkService, BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, + networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService(), null, createSSLService(settings1)) { @Override diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java index 3ccd6c5f7e3..11d2e2a9848 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.transport.MockTransportService; @@ -34,9 +33,8 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleSecurityTrans Settings settings1 = Settings.builder() .put(settings) .put("xpack.security.transport.ssl.enabled", true).build(); - Transport transport = new SecurityNioTransport(settings1, version, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, - new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService(), null, - createSSLService(settings1)) { + Transport transport = new SecurityNioTransport(settings1, version, threadPool, networkService, new MockPageCacheRecycler(settings), + namedWriteableRegistry, new NoneCircuitBreakerService(), null, createSSLService(settings1)) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,