From 790f8102e9c808ec1e497f77bb0e93dc5534d346 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 11 Dec 2018 11:55:41 -0700 Subject: [PATCH] Modify `BigArrays` to take name of circuit breaker (#36461) This commit modifies BigArrays to take a circuit breaker name and the circuit breaking service. The default instance of BigArrays that is passed around everywhere always uses the request breaker. At the network level, we want to be using the inflight request breaker. So this change will allow that. Additionally, as this change moves away from a single instance of BigArrays, the class is modified to not be a Releasable anymore. Releasing big arrays was always dispatching to the PageCacheRecycler, so this change makes the PageCacheRecycler the class that needs to be managed and torn-down. Finally, this commit closes #31435 be making the serialization of transport messages use the inflight request breaker. With this change, we no longer push the global BigArrays instnace to the network level. --- .../elasticsearch/transport/Netty4Plugin.java | 8 +++--- .../transport/netty4/Netty4Transport.java | 9 ++++--- .../Netty4SizeHeaderFrameDecoderTests.java | 7 +++-- .../transport/netty4/Netty4TransportIT.java | 9 +++---- .../transport/netty4/Netty4UtilsTests.java | 3 ++- .../netty4/NettyTransportMultiPortTests.java | 7 +++-- .../netty4/SimpleNetty4TransportTests.java | 4 +-- .../transport/nio/NioTransport.java | 7 ++--- .../transport/nio/NioTransportPlugin.java | 10 +++---- .../transport/nio/NioTransportIT.java | 10 +++---- .../nio/SimpleNioTransportTests.java | 5 ++-- .../client/transport/TransportClient.java | 8 +++--- .../common/network/NetworkModule.java | 2 +- .../elasticsearch/common/util/BigArrays.java | 27 +++++++++---------- .../common/util/PageCacheRecycler.java | 6 +++++ .../java/org/elasticsearch/node/Node.java | 8 +++--- .../elasticsearch/plugins/NetworkPlugin.java | 6 ++--- .../elasticsearch/transport/TcpTransport.java | 11 +++++--- .../common/network/NetworkModuleTests.java | 6 ++--- .../common/util/BigArraysTests.java | 4 +-- .../elasticsearch/index/IndexModuleTests.java | 3 ++- .../transport/TcpTransportTests.java | 4 +-- .../bytes/AbstractBytesReferenceTestCase.java | 3 ++- .../common/util/MockBigArrays.java | 3 ++- .../test/transport/MockTransportService.java | 4 +-- .../transport/MockTcpTransport.java | 8 +++--- .../transport/nio/MockNioTransport.java | 7 ++--- .../transport/nio/MockNioTransportPlugin.java | 9 +++---- .../nio/SimpleMockNioTransportTests.java | 5 ++-- .../xpack/core/XPackClientPlugin.java | 4 +-- .../netty4/SecurityNetty4Transport.java | 6 ++--- .../core/LocalStateCompositeXPackPlugin.java | 12 ++++----- .../xpack/security/Security.java | 10 +++---- .../netty4/SecurityNetty4ServerTransport.java | 6 ++--- .../transport/nio/SecurityNioTransport.java | 5 ++-- .../SecurityNetty4ServerTransportTests.java | 5 ++-- ...pleSecurityNetty4ServerTransportTests.java | 4 +-- .../nio/SimpleSecurityNioTransportTests.java | 6 ++--- 38 files changed, 121 insertions(+), 140 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java index c2c841f889a..609186fc3c3 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java @@ -77,13 +77,11 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin { } @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - PageCacheRecycler pageCacheRecycler, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { + NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool, - networkService, bigArrays, namedWriteableRegistry, circuitBreakerService)); + networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService)); } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 60ef6b21b1f..8e8a8abcc37 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -51,7 +51,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; @@ -104,9 +104,10 @@ public class Netty4Transport extends TcpTransport { private volatile Bootstrap clientBootstrap; private volatile NioEventLoopGroup eventLoopGroup; - public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, - NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - super("netty", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); + public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, + PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, + CircuitBreakerService circuitBreakerService) { + super("netty", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings)); this.workerCount = WORKER_COUNT.get(settings); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java index a711bb690e3..f467a8ad8f3 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java @@ -24,9 +24,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.mocksocket.MockSocket; import org.elasticsearch.test.ESTestCase; @@ -65,8 +64,8 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase { public void startThreadPool() { threadPool = new ThreadPool(settings); NetworkService networkService = new NetworkService(Collections.emptyList()); - BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); - nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, bigArrays, + PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY); + nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); nettyTransport.start(); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java index b93e09b5364..ae0109a83b0 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.plugins.NetworkPlugin; @@ -90,13 +89,13 @@ public class Netty4TransportIT extends ESNetty4IntegTestCase { public static class TestPlugin extends Plugin implements NetworkPlugin { @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.singletonMap("exception-throwing", - () -> new ExceptionThrowingNetty4Transport(settings, threadPool, networkService, bigArrays, + () -> new ExceptionThrowingNetty4Transport(settings, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService)); } } @@ -105,10 +104,10 @@ public class Netty4TransportIT extends ESNetty4IntegTestCase { Settings settings, ThreadPool threadPool, NetworkService networkService, - BigArrays bigArrays, + PageCacheRecycler recycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - super(settings, Version.CURRENT, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService); + super(settings, Version.CURRENT, threadPool, networkService, recycler, namedWriteableRegistry, circuitBreakerService); } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java index 8372a8540b8..87731ef5fb1 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java @@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -36,7 +37,7 @@ import java.io.IOException; public class Netty4UtilsTests extends ESTestCase { private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE; - private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false); + private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), CircuitBreaker.REQUEST); public void testToChannelBufferWithEmptyRef() throws IOException { ByteBuf buffer = Netty4Utils.toByteBuf(getRandomizedBytesReference(0)); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java index 785c4cfb114..ecb720173f7 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java @@ -24,9 +24,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -118,9 +117,9 @@ public class NettyTransportMultiPortTests extends ESTestCase { } private TcpTransport startTransport(Settings settings, ThreadPool threadPool) { - BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY); TcpTransport transport = new Netty4Transport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()), - bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); + recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); transport.start(); assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED)); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index 006fbae6c42..5c3279eaf15 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -27,7 +27,7 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; import org.elasticsearch.test.transport.MockTransportService; @@ -55,7 +55,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase ClusterSettings clusterSettings, boolean doHandshake) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); Transport transport = new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()), - BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) { + PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 61a8fb7c69a..c5f5b414c07 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -66,16 +65,14 @@ public class NioTransport extends TcpTransport { (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), (s) -> Setting.parseInt(s, 1, "transport.nio.worker_count"), Setting.Property.NodeScope); - protected final PageCacheRecycler pageCacheRecycler; private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); private volatile NioGroup nioGroup; private volatile Function clientChannelFactory; - protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, + protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - super("nio", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); - this.pageCacheRecycler = pageCacheRecycler; + super("nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); } @Override diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java index fd57ea20b1c..9c73cbc5a66 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java @@ -56,14 +56,12 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin { } @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - PageCacheRecycler pageCacheRecycler, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { + NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.singletonMap(NIO_TRANSPORT_NAME, - () -> new NioTransport(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler, - namedWriteableRegistry, circuitBreakerService)); + () -> new NioTransport(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, + circuitBreakerService)); } @Override diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java index 0c1bad79ee8..c6452e0be91 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.plugins.NetworkPlugin; @@ -90,22 +89,21 @@ public class NioTransportIT extends NioIntegTestCase { public static class TestPlugin extends Plugin implements NetworkPlugin { @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.singletonMap("exception-throwing", - () -> new ExceptionThrowingNioTransport(settings, threadPool, networkService, bigArrays, pageCacheRecycler, + () -> new ExceptionThrowingNioTransport(settings, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService)); } } - ExceptionThrowingNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, + ExceptionThrowingNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - super(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, - circuitBreakerService); + super(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); } @Override diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index 70acc2d1482..9dd3bc3b957 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; @@ -58,8 +57,8 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase { ClusterSettings clusterSettings, boolean doHandshake) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); NetworkService networkService = new NetworkService(Collections.emptyList()); - Transport transport = new NioTransport(settings, version, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, - new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()) { + Transport transport = new NioTransport(settings, version, threadPool, networkService, new MockPageCacheRecycler(settings), + namedWriteableRegistry, new NoneCircuitBreakerService()) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java index b6c57a39a5a..8450fe8d714 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -28,6 +28,7 @@ import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Module; @@ -182,8 +183,8 @@ public abstract class TransportClient extends AbstractClient { settingsModule.getClusterSettings()); resourcesToClose.add(circuitBreakerService); PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings); - BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService); - resourcesToClose.add(bigArrays); + BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST); + resourcesToClose.add(pageCacheRecycler); modules.add(settingsModule); NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null); @@ -194,6 +195,7 @@ public abstract class TransportClient extends AbstractClient { UUIDs.randomBase64UUID()), null, Collections.emptySet()); modules.add((b -> { b.bind(BigArrays.class).toInstance(bigArrays); + b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler); b.bind(PluginsService.class).toInstance(pluginsService); b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); @@ -374,7 +376,7 @@ public abstract class TransportClient extends AbstractClient { closeables.add(plugin); } closeables.add(() -> ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS)); - closeables.add(injector.getInstance(BigArrays.class)); + closeables.add(injector.getInstance(PageCacheRecycler.class)); IOUtils.closeWhileHandlingException(closeables); } diff --git a/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java index cd8141ffa3c..edc815af417 100644 --- a/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java @@ -122,7 +122,7 @@ public final class NetworkModule { registerHttpTransport(entry.getKey(), entry.getValue()); } } - Map> transportFactory = plugin.getTransports(settings, threadPool, bigArrays, pageCacheRecycler, + Map> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); for (Map.Entry> entry : transportFactory.entrySet()) { registerTransport(entry.getKey(), entry.getValue()); diff --git a/server/src/main/java/org/elasticsearch/common/util/BigArrays.java b/server/src/main/java/org/elasticsearch/common/util/BigArrays.java index 12c511311ea..e6a6e98ad64 100644 --- a/server/src/main/java/org/elasticsearch/common/util/BigArrays.java +++ b/server/src/main/java/org/elasticsearch/common/util/BigArrays.java @@ -33,9 +33,9 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService; import java.util.Arrays; /** Utility class to work with arrays. */ -public class BigArrays implements Releasable { +public class BigArrays { - public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(null, null, false); + public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(null, null, CircuitBreaker.REQUEST); /** Page size in bytes: 16KB */ public static final int PAGE_SIZE_IN_BYTES = 1 << 14; @@ -83,11 +83,6 @@ public class BigArrays implements Releasable { return index == (int) index; } - @Override - public void close() { - recycler.close(); - } - private abstract static class AbstractArrayWrapper extends AbstractArray implements BigArray { static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(ByteArrayWrapper.class); @@ -369,24 +364,26 @@ public class BigArrays implements Releasable { } final PageCacheRecycler recycler; - final CircuitBreakerService breakerService; - final boolean checkBreaker; + private final CircuitBreakerService breakerService; + private final boolean checkBreaker; private final BigArrays circuitBreakingInstance; + private final String breakerName; - public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService) { + public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, String breakerName) { // Checking the breaker is disabled if not specified - this(recycler, breakerService, false); + this(recycler, breakerService, breakerName, false); } - // public for tests - public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, boolean checkBreaker) { + protected BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, String breakerName, + boolean checkBreaker) { this.checkBreaker = checkBreaker; this.recycler = recycler; this.breakerService = breakerService; + this.breakerName = breakerName; if (checkBreaker) { this.circuitBreakingInstance = this; } else { - this.circuitBreakingInstance = new BigArrays(recycler, breakerService, true); + this.circuitBreakingInstance = new BigArrays(recycler, breakerService, breakerName, true); } } @@ -400,7 +397,7 @@ public class BigArrays implements Releasable { */ void adjustBreaker(final long delta, final boolean isDataAlreadyCreated) { if (this.breakerService != null) { - CircuitBreaker breaker = this.breakerService.getBreaker(CircuitBreaker.REQUEST); + CircuitBreaker breaker = this.breakerService.getBreaker(breakerName); if (this.checkBreaker) { // checking breaker means potentially tripping, but it doesn't // have to if the delta is negative diff --git a/server/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java b/server/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java index 66bfdbdc194..b0d2dfca465 100644 --- a/server/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java +++ b/server/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java @@ -59,6 +59,12 @@ public class PageCacheRecycler implements Releasable { private final Recycler longPage; private final Recycler objectPage; + public static final PageCacheRecycler NON_RECYCLING_INSTANCE; + + static { + NON_RECYCLING_INSTANCE = new PageCacheRecycler(Settings.builder().put(LIMIT_HEAP_SETTING.getKey(), "0%").build()); + } + @Override public void close() { Releasables.close(true, bytePage, intPage, longPage, objectPage); diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 2543abce0e6..aa49b10b8b1 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -57,6 +57,7 @@ import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.StopWatch; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Injector; @@ -375,7 +376,7 @@ public class Node implements Closeable { PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings); BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService); - resourcesToClose.add(bigArrays); + resourcesToClose.add(pageCacheRecycler); modules.add(settingsModule); List namedWriteables = Stream.of( NetworkModule.getNamedWriteables().stream(), @@ -516,6 +517,7 @@ public class Node implements Closeable { b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService); b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService); b.bind(BigArrays.class).toInstance(bigArrays); + b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler); b.bind(ScriptService.class).toInstance(scriptModule.getScriptService()); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); b.bind(IngestService.class).toInstance(ingestService); @@ -845,7 +847,7 @@ public class Node implements Closeable { toClose.add(injector.getInstance(NodeEnvironment.class)); - toClose.add(injector.getInstance(BigArrays.class)); + toClose.add(injector.getInstance(PageCacheRecycler.class)); if (logger.isTraceEnabled()) { logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint()); @@ -928,7 +930,7 @@ public class Node implements Closeable { * This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing */ BigArrays createBigArrays(PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) { - return new BigArrays(pageCacheRecycler, circuitBreakerService); + return new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST); } /** diff --git a/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java b/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java index d33997fc82b..56e95200ccc 100644 --- a/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java @@ -58,11 +58,9 @@ public interface NetworkPlugin { * Returns a map of {@link Transport} suppliers. * See {@link org.elasticsearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation. */ - default Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - PageCacheRecycler pageCacheRecycler, + default Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { + NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.emptyMap(); } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 659c264ab37..c45525632de 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -59,6 +59,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.CountDown; @@ -179,6 +180,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final Version version; protected final ThreadPool threadPool; protected final BigArrays bigArrays; + protected final PageCacheRecycler pageCacheRecycler; protected final NetworkService networkService; protected final Set profileSettings; @@ -206,15 +208,16 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final TransportKeepAlive keepAlive; private final String nodeName; - public TcpTransport(String transportName, Settings settings, Version version, ThreadPool threadPool, BigArrays bigArrays, - CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { + public TcpTransport(String transportName, Settings settings, Version version, ThreadPool threadPool, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, + NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { super(settings); this.settings = settings; this.profileSettings = getProfileSettings(settings); this.version = version; this.threadPool = threadPool; - this.bigArrays = bigArrays; + this.bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS); + this.pageCacheRecycler = pageCacheRecycler; this.circuitBreakerService = circuitBreakerService; this.namedWriteableRegistry = namedWriteableRegistry; this.compressResponses = Transport.TRANSPORT_TCP_COMPRESS.get(settings); diff --git a/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java index 7f39b6d3b88..0dbc3efe075 100644 --- a/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java @@ -131,7 +131,7 @@ public class NetworkModuleTests extends ESTestCase { Supplier custom = () -> null; // content doesn't matter we check reference equality NetworkPlugin plugin = new NetworkPlugin() { @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, @@ -187,7 +187,7 @@ public class NetworkModuleTests extends ESTestCase { Supplier def = FakeHttpTransport::new; NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() { @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, @@ -222,7 +222,7 @@ public class NetworkModuleTests extends ESTestCase { Supplier customTransport = () -> null; NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() { @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, diff --git a/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java b/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java index 9b6816e2ee8..83ad34da096 100644 --- a/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java @@ -358,7 +358,7 @@ public class BigArraysTests extends ESTestCase { .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - BigArrays bigArrays = new BigArrays(null, hcbs, false).withCircuitBreaking(); + BigArrays bigArrays = new BigArrays(null, hcbs, CircuitBreaker.REQUEST).withCircuitBreaking(); Method create = BigArrays.class.getMethod("new" + type + "Array", long.class); final int size = scaledRandomIntBetween(10, maxSize / 16); BigArray array = (BigArray) create.invoke(bigArrays, size); @@ -422,7 +422,7 @@ public class BigArraysTests extends ESTestCase { .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - BigArrays bigArrays = new BigArrays(null, hcbs, false); + BigArrays bigArrays = new BigArrays(null, hcbs, CircuitBreaker.REQUEST); return (withBreaking ? bigArrays.withCircuitBreaking() : bigArrays); } diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index e27465078ce..df411f81e3c 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -33,6 +33,7 @@ import org.apache.lucene.util.SetOnce.AlreadySetException; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -132,7 +133,7 @@ public class IndexModuleTests extends ESTestCase { threadPool = new TestThreadPool("test"); circuitBreakerService = new NoneCircuitBreakerService(); PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings); - bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService); + bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST); scriptService = new ScriptService(settings, Collections.emptyMap(), Collections.emptyMap()); clusterService = ClusterServiceUtils.createClusterService(threadPool); nodeEnvironment = new NodeEnvironment(settings, environment); diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 27e4eb7ca39..b52ae8759f2 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -32,8 +32,8 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.TestThreadPool; @@ -191,7 +191,7 @@ public class TcpTransportTests extends ESTestCase { AtomicReference messageCaptor = new AtomicReference<>(); try { TcpTransport transport = new TcpTransport("test", Settings.EMPTY, Version.CURRENT, threadPool, - new BigArrays(new PageCacheRecycler(Settings.EMPTY), null), null, null, null) { + PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), null, null) { @Override protected FakeServerChannel bind(String name, InetSocketAddress address) throws IOException { diff --git a/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java b/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java index 7b1073e954f..dee1ff7edbe 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java @@ -22,6 +22,7 @@ package org.elasticsearch.common.bytes; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.BytesRefIterator; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -37,7 +38,7 @@ import java.util.Arrays; public abstract class AbstractBytesReferenceTestCase extends ESTestCase { protected static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE; - protected final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false); + protected final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), CircuitBreaker.REQUEST); public void testGet() throws IOException { int length = randomIntBetween(1, PAGE_SIZE * 3); diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java index ad7002436c7..586303c23c4 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java @@ -26,6 +26,7 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountables; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.test.ESTestCase; @@ -86,7 +87,7 @@ public class MockBigArrays extends BigArrays { } private MockBigArrays(PageCacheRecycler recycler, CircuitBreakerService breakerService, boolean checkBreaker) { - super(recycler, breakerService, checkBreaker); + super(recycler, breakerService, CircuitBreaker.REQUEST, checkBreaker); this.recycler = recycler; this.breakerService = breakerService; long seed; diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 09f735cb0ab..c0879af2dfa 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -37,7 +37,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.RunOnce; @@ -113,8 +112,7 @@ public final class MockTransportService extends TransportService { settings = Settings.builder().put(TcpTransport.PORT.getKey(), basePort + "-" + (basePort + 100)).put(settings).build(); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); return new MockNioTransport(settings, version, threadPool, new NetworkService(Collections.emptyList()), - BigArrays.NON_RECYCLING_INSTANCE, new MockPageCacheRecycler(settings), namedWriteableRegistry, - new NoneCircuitBreakerService()); + new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()); } public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool, diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index b5cdbeae2c9..6b328f18625 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.internal.io.IOUtils; @@ -98,15 +99,14 @@ public class MockTcpTransport extends TcpTransport { public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { - this(settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService, - Version.CURRENT); + this(settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService, Version.CURRENT); } public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, Version mockVersion) { - super("mock-tcp-transport", settings, mockVersion, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, - networkService); + super("mock-tcp-transport", settings, mockVersion, threadPool, PageCacheRecycler.NON_RECYCLING_INSTANCE, circuitBreakerService, + namedWriteableRegistry, networkService); // we have our own crazy cached threadpool this one is not bounded at all... // using the ES thread factory here is crucial for tests otherwise disruption tests won't block that thread executor = Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, Transports.TEST_MOCK_TRANSPORT_THREAD_PREFIX)); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index 780913ae9f6..cbadcfa1f38 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.nio.BytesChannelContext; @@ -69,16 +68,14 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF public class MockNioTransport extends TcpTransport { private static final Logger logger = LogManager.getLogger(MockNioTransport.class); - private final PageCacheRecycler pageCacheRecycler; private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); private volatile NioGroup nioGroup; private volatile MockTcpChannelFactory clientChannelFactory; - public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, + public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - super("mock-nio", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); - this.pageCacheRecycler = pageCacheRecycler; + super("mock-nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransportPlugin.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransportPlugin.java index ceabe72ee44..f136185fac3 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransportPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransportPlugin.java @@ -22,7 +22,6 @@ import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.plugins.NetworkPlugin; @@ -39,13 +38,11 @@ public class MockNioTransportPlugin extends Plugin implements NetworkPlugin { public static final String MOCK_NIO_TRANSPORT_NAME = "mock-nio"; @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - PageCacheRecycler pageCacheRecycler, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { + NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.singletonMap(MOCK_NIO_TRANSPORT_NAME, - () -> new MockNioTransport(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler, + () -> new MockNioTransport(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService)); } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java index 5facd0211f6..84b921cf88f 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; @@ -57,8 +56,8 @@ public class SimpleMockNioTransportTests extends AbstractSimpleTransportTestCase ClusterSettings clusterSettings, boolean doHandshake) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); NetworkService networkService = new NetworkService(Collections.emptyList()); - Transport transport = new MockNioTransport(settings, version, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, - new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()) { + Transport transport = new MockNioTransport(settings, version, threadPool, networkService, new MockPageCacheRecycler(settings), + namedWriteableRegistry, new NoneCircuitBreakerService()) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 18474b08e49..10c30a2f331 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -461,7 +460,6 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl public Map> getTransports( final Settings settings, final ThreadPool threadPool, - final BigArrays bigArrays, final PageCacheRecycler pageCacheRecycler, final CircuitBreakerService circuitBreakerService, final NamedWriteableRegistry namedWriteableRegistry, @@ -477,7 +475,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl throw new RuntimeException(e); } return Collections.singletonMap(SecurityField.NAME4, () -> new SecurityNetty4Transport(settings, Version.CURRENT, threadPool, - networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, sslService)); + networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService)); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java index f30fe43e2b5..3712c27c435 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java @@ -20,7 +20,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; @@ -61,11 +61,11 @@ public class SecurityNetty4Transport extends Netty4Transport { final Version version, final ThreadPool threadPool, final NetworkService networkService, - final BigArrays bigArrays, + final PageCacheRecycler pageCacheRecycler, final NamedWriteableRegistry namedWriteableRegistry, final CircuitBreakerService circuitBreakerService, final SSLService sslService) { - super(settings, version, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService); + super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); this.sslService = sslService; this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings); if (sslEnabled) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index d0686bd03d9..2b19eea5b56 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -276,15 +276,13 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip } @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - PageCacheRecycler pageCacheRecycler, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { + NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { Map> transports = new HashMap<>(); - transports.putAll(super.getTransports(settings, threadPool, bigArrays, pageCacheRecycler, - circuitBreakerService, namedWriteableRegistry, networkService)); - filterPlugins(NetworkPlugin.class).stream().forEach(p -> transports.putAll(p.getTransports(settings, threadPool, bigArrays, + transports.putAll(super.getTransports(settings, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, + networkService)); + filterPlugins(NetworkPlugin.class).stream().forEach(p -> transports.putAll(p.getTransports(settings, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService))); return transports; diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 7ebd50d1944..6b41ab02071 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -889,20 +889,18 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw } @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - PageCacheRecycler pageCacheRecycler, + public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { + NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { if (transportClientMode || enabled == false) { // don't register anything if we are not enabled, or in transport client mode return Collections.emptyMap(); } Map> transports = new HashMap<>(); transports.put(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, Version.CURRENT, threadPool, - networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); + networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, Version.CURRENT, threadPool, - networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); + networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); return Collections.unmodifiableMap(transports); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java index d74aa65e94b..8cb1085d3aa 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java @@ -12,7 +12,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport; @@ -29,12 +29,12 @@ public class SecurityNetty4ServerTransport extends SecurityNetty4Transport { final Version version, final ThreadPool threadPool, final NetworkService networkService, - final BigArrays bigArrays, + final PageCacheRecycler pageCacheRecycler, final NamedWriteableRegistry namedWriteableRegistry, final CircuitBreakerService circuitBreakerService, @Nullable final IPFilter authenticator, final SSLService sslService) { - super(settings, version, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, sslService); + super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService); this.authenticator = authenticator; } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java index b7392a32c4e..0642f635ed0 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.nio.BytesChannelContext; @@ -75,10 +74,10 @@ public class SecurityNioTransport extends NioTransport { private final boolean sslEnabled; public SecurityNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, - BigArrays bigArrays, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, + PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService, @Nullable final IPFilter authenticator, SSLService sslService) { - super(settings, version, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); + super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); this.authenticator = authenticator; this.sslService = sslService; this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransportTests.java index dc6bffe5c72..32f98280118 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransportTests.java @@ -13,7 +13,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -26,7 +26,6 @@ import org.elasticsearch.xpack.core.ssl.SSLService; import org.junit.Before; import javax.net.ssl.SSLEngine; - import java.nio.file.Path; import java.util.Collections; import java.util.Locale; @@ -72,7 +71,7 @@ public class SecurityNetty4ServerTransportTests extends ESTestCase { Version.CURRENT, mock(ThreadPool.class), new NetworkService(Collections.emptyList()), - mock(BigArrays.class), + mock(PageCacheRecycler.class), mock(NamedWriteableRegistry.class), mock(CircuitBreakerService.class), null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java index bf31240148a..d5a5cedc19a 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java @@ -12,7 +12,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; @@ -34,7 +34,7 @@ public class SimpleSecurityNetty4ServerTransportTests extends AbstractSimpleSecu .put(settings) .put("xpack.security.transport.ssl.enabled", true).build(); Transport transport = new SecurityNetty4ServerTransport(settings1, version, threadPool, - networkService, BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, + networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService(), null, createSSLService(settings1)) { @Override diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java index 3ccd6c5f7e3..11d2e2a9848 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.transport.MockTransportService; @@ -34,9 +33,8 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleSecurityTrans Settings settings1 = Settings.builder() .put(settings) .put("xpack.security.transport.ssl.enabled", true).build(); - Transport transport = new SecurityNioTransport(settings1, version, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, - new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService(), null, - createSSLService(settings1)) { + Transport transport = new SecurityNioTransport(settings1, version, threadPool, networkService, new MockPageCacheRecycler(settings), + namedWriteableRegistry, new NoneCircuitBreakerService(), null, createSSLService(settings1)) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,