Modify `BigArrays` to take name of circuit breaker (#36461)
This commit modifies BigArrays to take a circuit breaker name and the circuit breaking service. The default instance of BigArrays that is passed around everywhere always uses the request breaker. At the network level, we want to be using the inflight request breaker. So this change will allow that. Additionally, as this change moves away from a single instance of BigArrays, the class is modified to not be a Releasable anymore. Releasing big arrays was always dispatching to the PageCacheRecycler, so this change makes the PageCacheRecycler the class that needs to be managed and torn-down. Finally, this commit closes #31435 be making the serialization of transport messages use the inflight request breaker. With this change, we no longer push the global BigArrays instnace to the network level.
This commit is contained in:
parent
a07fb312fc
commit
790f8102e9
|
@ -77,13 +77,11 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
|
||||
PageCacheRecycler pageCacheRecycler,
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
|
||||
CircuitBreakerService circuitBreakerService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
NetworkService networkService) {
|
||||
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
|
||||
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
|
||||
networkService, bigArrays, namedWriteableRegistry, circuitBreakerService));
|
||||
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -51,7 +51,7 @@ import org.elasticsearch.common.settings.Setting.Property;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -104,9 +104,10 @@ public class Netty4Transport extends TcpTransport {
|
|||
private volatile Bootstrap clientBootstrap;
|
||||
private volatile NioEventLoopGroup eventLoopGroup;
|
||||
|
||||
public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
|
||||
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
super("netty", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
|
||||
public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
|
||||
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
|
||||
CircuitBreakerService circuitBreakerService) {
|
||||
super("netty", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
|
||||
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
|
||||
this.workerCount = WORKER_COUNT.get(settings);
|
||||
|
||||
|
|
|
@ -24,9 +24,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.common.util.MockPageCacheRecycler;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.mocksocket.MockSocket;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -65,8 +64,8 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase {
|
|||
public void startThreadPool() {
|
||||
threadPool = new ThreadPool(settings);
|
||||
NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
|
||||
nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, bigArrays,
|
||||
PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY);
|
||||
nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, recycler,
|
||||
new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
|
||||
nettyTransport.start();
|
||||
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.plugins.NetworkPlugin;
|
||||
|
@ -90,13 +89,13 @@ public class Netty4TransportIT extends ESNetty4IntegTestCase {
|
|||
public static class TestPlugin extends Plugin implements NetworkPlugin {
|
||||
|
||||
@Override
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool,
|
||||
PageCacheRecycler pageCacheRecycler,
|
||||
CircuitBreakerService circuitBreakerService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
NetworkService networkService) {
|
||||
return Collections.singletonMap("exception-throwing",
|
||||
() -> new ExceptionThrowingNetty4Transport(settings, threadPool, networkService, bigArrays,
|
||||
() -> new ExceptionThrowingNetty4Transport(settings, threadPool, networkService, pageCacheRecycler,
|
||||
namedWriteableRegistry, circuitBreakerService));
|
||||
}
|
||||
}
|
||||
|
@ -105,10 +104,10 @@ public class Netty4TransportIT extends ESNetty4IntegTestCase {
|
|||
Settings settings,
|
||||
ThreadPool threadPool,
|
||||
NetworkService networkService,
|
||||
BigArrays bigArrays,
|
||||
PageCacheRecycler recycler,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
CircuitBreakerService circuitBreakerService) {
|
||||
super(settings, Version.CURRENT, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
|
||||
super(settings, Version.CURRENT, threadPool, networkService, recycler, namedWriteableRegistry, circuitBreakerService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
|
|||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -36,7 +37,7 @@ import java.io.IOException;
|
|||
public class Netty4UtilsTests extends ESTestCase {
|
||||
|
||||
private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
|
||||
private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false);
|
||||
private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), CircuitBreaker.REQUEST);
|
||||
|
||||
public void testToChannelBufferWithEmptyRef() throws IOException {
|
||||
ByteBuf buffer = Netty4Utils.toByteBuf(getRandomizedBytesReference(0));
|
||||
|
|
|
@ -24,9 +24,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.network.NetworkUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.common.util.MockPageCacheRecycler;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
|
@ -118,9 +117,9 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private TcpTransport startTransport(Settings settings, ThreadPool threadPool) {
|
||||
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
|
||||
PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY);
|
||||
TcpTransport transport = new Netty4Transport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()),
|
||||
bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
|
||||
recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
|
||||
transport.start();
|
||||
|
||||
assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED));
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.elasticsearch.common.network.NetworkService;
|
|||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
|
@ -55,7 +55,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
|
|||
ClusterSettings clusterSettings, boolean doHandshake) {
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||
Transport transport = new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()),
|
||||
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
||||
PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
||||
|
||||
@Override
|
||||
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.elasticsearch.common.network.NetworkService;
|
|||
import org.elasticsearch.common.recycler.Recycler;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
|
@ -66,16 +65,14 @@ public class NioTransport extends TcpTransport {
|
|||
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2),
|
||||
(s) -> Setting.parseInt(s, 1, "transport.nio.worker_count"), Setting.Property.NodeScope);
|
||||
|
||||
protected final PageCacheRecycler pageCacheRecycler;
|
||||
private final ConcurrentMap<String, TcpChannelFactory> profileToChannelFactory = newConcurrentMap();
|
||||
private volatile NioGroup nioGroup;
|
||||
private volatile Function<DiscoveryNode, TcpChannelFactory> clientChannelFactory;
|
||||
|
||||
protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
|
||||
protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
|
||||
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
|
||||
CircuitBreakerService circuitBreakerService) {
|
||||
super("nio", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
|
||||
this.pageCacheRecycler = pageCacheRecycler;
|
||||
super("nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -56,14 +56,12 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
|
||||
PageCacheRecycler pageCacheRecycler,
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
|
||||
CircuitBreakerService circuitBreakerService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
NetworkService networkService) {
|
||||
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
|
||||
return Collections.singletonMap(NIO_TRANSPORT_NAME,
|
||||
() -> new NioTransport(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler,
|
||||
namedWriteableRegistry, circuitBreakerService));
|
||||
() -> new NioTransport(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry,
|
||||
circuitBreakerService));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.plugins.NetworkPlugin;
|
||||
|
@ -90,22 +89,21 @@ public class NioTransportIT extends NioIntegTestCase {
|
|||
public static class TestPlugin extends Plugin implements NetworkPlugin {
|
||||
|
||||
@Override
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool,
|
||||
PageCacheRecycler pageCacheRecycler,
|
||||
CircuitBreakerService circuitBreakerService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
NetworkService networkService) {
|
||||
return Collections.singletonMap("exception-throwing",
|
||||
() -> new ExceptionThrowingNioTransport(settings, threadPool, networkService, bigArrays, pageCacheRecycler,
|
||||
() -> new ExceptionThrowingNioTransport(settings, threadPool, networkService, pageCacheRecycler,
|
||||
namedWriteableRegistry, circuitBreakerService));
|
||||
}
|
||||
}
|
||||
|
||||
ExceptionThrowingNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
|
||||
ExceptionThrowingNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService,
|
||||
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
|
||||
CircuitBreakerService circuitBreakerService) {
|
||||
super(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry,
|
||||
circuitBreakerService);
|
||||
super(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.common.network.NetworkService;
|
|||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.MockPageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.node.Node;
|
||||
|
@ -58,8 +57,8 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase {
|
|||
ClusterSettings clusterSettings, boolean doHandshake) {
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||
NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||
Transport transport = new NioTransport(settings, version, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE,
|
||||
new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
||||
Transport transport = new NioTransport(settings, version, threadPool, networkService, new MockPageCacheRecycler(settings),
|
||||
namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
||||
|
||||
@Override
|
||||
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.client.support.AbstractClient;
|
|||
import org.elasticsearch.cluster.ClusterModule;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Injector;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
|
@ -182,8 +183,8 @@ public abstract class TransportClient extends AbstractClient {
|
|||
settingsModule.getClusterSettings());
|
||||
resourcesToClose.add(circuitBreakerService);
|
||||
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
|
||||
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService);
|
||||
resourcesToClose.add(bigArrays);
|
||||
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
|
||||
resourcesToClose.add(pageCacheRecycler);
|
||||
modules.add(settingsModule);
|
||||
NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
|
||||
bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
|
||||
|
@ -194,6 +195,7 @@ public abstract class TransportClient extends AbstractClient {
|
|||
UUIDs.randomBase64UUID()), null, Collections.emptySet());
|
||||
modules.add((b -> {
|
||||
b.bind(BigArrays.class).toInstance(bigArrays);
|
||||
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
|
||||
b.bind(PluginsService.class).toInstance(pluginsService);
|
||||
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
|
||||
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
|
||||
|
@ -374,7 +376,7 @@ public abstract class TransportClient extends AbstractClient {
|
|||
closeables.add(plugin);
|
||||
}
|
||||
closeables.add(() -> ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS));
|
||||
closeables.add(injector.getInstance(BigArrays.class));
|
||||
closeables.add(injector.getInstance(PageCacheRecycler.class));
|
||||
IOUtils.closeWhileHandlingException(closeables);
|
||||
}
|
||||
|
||||
|
|
|
@ -122,7 +122,7 @@ public final class NetworkModule {
|
|||
registerHttpTransport(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, bigArrays, pageCacheRecycler,
|
||||
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
|
||||
circuitBreakerService, namedWriteableRegistry, networkService);
|
||||
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
|
||||
registerTransport(entry.getKey(), entry.getValue());
|
||||
|
|
|
@ -33,9 +33,9 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
|||
import java.util.Arrays;
|
||||
|
||||
/** Utility class to work with arrays. */
|
||||
public class BigArrays implements Releasable {
|
||||
public class BigArrays {
|
||||
|
||||
public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(null, null, false);
|
||||
public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(null, null, CircuitBreaker.REQUEST);
|
||||
|
||||
/** Page size in bytes: 16KB */
|
||||
public static final int PAGE_SIZE_IN_BYTES = 1 << 14;
|
||||
|
@ -83,11 +83,6 @@ public class BigArrays implements Releasable {
|
|||
return index == (int) index;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
recycler.close();
|
||||
}
|
||||
|
||||
private abstract static class AbstractArrayWrapper extends AbstractArray implements BigArray {
|
||||
|
||||
static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(ByteArrayWrapper.class);
|
||||
|
@ -369,24 +364,26 @@ public class BigArrays implements Releasable {
|
|||
}
|
||||
|
||||
final PageCacheRecycler recycler;
|
||||
final CircuitBreakerService breakerService;
|
||||
final boolean checkBreaker;
|
||||
private final CircuitBreakerService breakerService;
|
||||
private final boolean checkBreaker;
|
||||
private final BigArrays circuitBreakingInstance;
|
||||
private final String breakerName;
|
||||
|
||||
public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService) {
|
||||
public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, String breakerName) {
|
||||
// Checking the breaker is disabled if not specified
|
||||
this(recycler, breakerService, false);
|
||||
this(recycler, breakerService, breakerName, false);
|
||||
}
|
||||
|
||||
// public for tests
|
||||
public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, boolean checkBreaker) {
|
||||
protected BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, String breakerName,
|
||||
boolean checkBreaker) {
|
||||
this.checkBreaker = checkBreaker;
|
||||
this.recycler = recycler;
|
||||
this.breakerService = breakerService;
|
||||
this.breakerName = breakerName;
|
||||
if (checkBreaker) {
|
||||
this.circuitBreakingInstance = this;
|
||||
} else {
|
||||
this.circuitBreakingInstance = new BigArrays(recycler, breakerService, true);
|
||||
this.circuitBreakingInstance = new BigArrays(recycler, breakerService, breakerName, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -400,7 +397,7 @@ public class BigArrays implements Releasable {
|
|||
*/
|
||||
void adjustBreaker(final long delta, final boolean isDataAlreadyCreated) {
|
||||
if (this.breakerService != null) {
|
||||
CircuitBreaker breaker = this.breakerService.getBreaker(CircuitBreaker.REQUEST);
|
||||
CircuitBreaker breaker = this.breakerService.getBreaker(breakerName);
|
||||
if (this.checkBreaker) {
|
||||
// checking breaker means potentially tripping, but it doesn't
|
||||
// have to if the delta is negative
|
||||
|
|
|
@ -59,6 +59,12 @@ public class PageCacheRecycler implements Releasable {
|
|||
private final Recycler<long[]> longPage;
|
||||
private final Recycler<Object[]> objectPage;
|
||||
|
||||
public static final PageCacheRecycler NON_RECYCLING_INSTANCE;
|
||||
|
||||
static {
|
||||
NON_RECYCLING_INSTANCE = new PageCacheRecycler(Settings.builder().put(LIMIT_HEAP_SETTING.getKey(), "0%").build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
Releasables.close(true, bytePage, intPage, longPage, objectPage);
|
||||
|
|
|
@ -57,6 +57,7 @@ import org.elasticsearch.cluster.routing.RoutingService;
|
|||
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Injector;
|
||||
|
@ -375,7 +376,7 @@ public class Node implements Closeable {
|
|||
|
||||
PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
|
||||
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
|
||||
resourcesToClose.add(bigArrays);
|
||||
resourcesToClose.add(pageCacheRecycler);
|
||||
modules.add(settingsModule);
|
||||
List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
|
||||
NetworkModule.getNamedWriteables().stream(),
|
||||
|
@ -516,6 +517,7 @@ public class Node implements Closeable {
|
|||
b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
|
||||
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
|
||||
b.bind(BigArrays.class).toInstance(bigArrays);
|
||||
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
|
||||
b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
|
||||
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
|
||||
b.bind(IngestService.class).toInstance(ingestService);
|
||||
|
@ -845,7 +847,7 @@ public class Node implements Closeable {
|
|||
|
||||
|
||||
toClose.add(injector.getInstance(NodeEnvironment.class));
|
||||
toClose.add(injector.getInstance(BigArrays.class));
|
||||
toClose.add(injector.getInstance(PageCacheRecycler.class));
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint());
|
||||
|
@ -928,7 +930,7 @@ public class Node implements Closeable {
|
|||
* This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
|
||||
*/
|
||||
BigArrays createBigArrays(PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) {
|
||||
return new BigArrays(pageCacheRecycler, circuitBreakerService);
|
||||
return new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -58,11 +58,9 @@ public interface NetworkPlugin {
|
|||
* Returns a map of {@link Transport} suppliers.
|
||||
* See {@link org.elasticsearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation.
|
||||
*/
|
||||
default Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
|
||||
PageCacheRecycler pageCacheRecycler,
|
||||
default Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
|
||||
CircuitBreakerService circuitBreakerService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
NetworkService networkService) {
|
||||
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.elasticsearch.common.transport.TransportAddress;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
|
@ -179,6 +180,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|||
private final Version version;
|
||||
protected final ThreadPool threadPool;
|
||||
protected final BigArrays bigArrays;
|
||||
protected final PageCacheRecycler pageCacheRecycler;
|
||||
protected final NetworkService networkService;
|
||||
protected final Set<ProfileSettings> profileSettings;
|
||||
|
||||
|
@ -206,15 +208,16 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|||
private final TransportKeepAlive keepAlive;
|
||||
private final String nodeName;
|
||||
|
||||
public TcpTransport(String transportName, Settings settings, Version version, ThreadPool threadPool, BigArrays bigArrays,
|
||||
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
|
||||
NetworkService networkService) {
|
||||
public TcpTransport(String transportName, Settings settings, Version version, ThreadPool threadPool,
|
||||
PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService,
|
||||
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
|
||||
super(settings);
|
||||
this.settings = settings;
|
||||
this.profileSettings = getProfileSettings(settings);
|
||||
this.version = version;
|
||||
this.threadPool = threadPool;
|
||||
this.bigArrays = bigArrays;
|
||||
this.bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS);
|
||||
this.pageCacheRecycler = pageCacheRecycler;
|
||||
this.circuitBreakerService = circuitBreakerService;
|
||||
this.namedWriteableRegistry = namedWriteableRegistry;
|
||||
this.compressResponses = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
|
||||
|
|
|
@ -131,7 +131,7 @@ public class NetworkModuleTests extends ESTestCase {
|
|||
Supplier<Transport> custom = () -> null; // content doesn't matter we check reference equality
|
||||
NetworkPlugin plugin = new NetworkPlugin() {
|
||||
@Override
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool,
|
||||
PageCacheRecycler pageCacheRecycler,
|
||||
CircuitBreakerService circuitBreakerService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
|
@ -187,7 +187,7 @@ public class NetworkModuleTests extends ESTestCase {
|
|||
Supplier<HttpServerTransport> def = FakeHttpTransport::new;
|
||||
NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() {
|
||||
@Override
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool,
|
||||
PageCacheRecycler pageCacheRecycler,
|
||||
CircuitBreakerService circuitBreakerService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
|
@ -222,7 +222,7 @@ public class NetworkModuleTests extends ESTestCase {
|
|||
Supplier<Transport> customTransport = () -> null;
|
||||
NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() {
|
||||
@Override
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool,
|
||||
PageCacheRecycler pageCacheRecycler,
|
||||
CircuitBreakerService circuitBreakerService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
|
|
|
@ -358,7 +358,7 @@ public class BigArraysTests extends ESTestCase {
|
|||
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false)
|
||||
.build(),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
BigArrays bigArrays = new BigArrays(null, hcbs, false).withCircuitBreaking();
|
||||
BigArrays bigArrays = new BigArrays(null, hcbs, CircuitBreaker.REQUEST).withCircuitBreaking();
|
||||
Method create = BigArrays.class.getMethod("new" + type + "Array", long.class);
|
||||
final int size = scaledRandomIntBetween(10, maxSize / 16);
|
||||
BigArray array = (BigArray) create.invoke(bigArrays, size);
|
||||
|
@ -422,7 +422,7 @@ public class BigArraysTests extends ESTestCase {
|
|||
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false)
|
||||
.build(),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
BigArrays bigArrays = new BigArrays(null, hcbs, false);
|
||||
BigArrays bigArrays = new BigArrays(null, hcbs, CircuitBreaker.REQUEST);
|
||||
return (withBreaking ? bigArrays.withCircuitBreaking() : bigArrays);
|
||||
}
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.lucene.util.SetOnce.AlreadySetException;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -132,7 +133,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
threadPool = new TestThreadPool("test");
|
||||
circuitBreakerService = new NoneCircuitBreakerService();
|
||||
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
|
||||
bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService);
|
||||
bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
|
||||
scriptService = new ScriptService(settings, Collections.emptyMap(), Collections.emptyMap());
|
||||
clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||
nodeEnvironment = new NodeEnvironment(settings, environment);
|
||||
|
|
|
@ -32,8 +32,8 @@ import org.elasticsearch.common.lease.Releasable;
|
|||
import org.elasticsearch.common.network.CloseableChannel;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
|
@ -191,7 +191,7 @@ public class TcpTransportTests extends ESTestCase {
|
|||
AtomicReference<BytesReference> messageCaptor = new AtomicReference<>();
|
||||
try {
|
||||
TcpTransport transport = new TcpTransport("test", Settings.EMPTY, Version.CURRENT, threadPool,
|
||||
new BigArrays(new PageCacheRecycler(Settings.EMPTY), null), null, null, null) {
|
||||
PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), null, null) {
|
||||
|
||||
@Override
|
||||
protected FakeServerChannel bind(String name, InetSocketAddress address) throws IOException {
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.common.bytes;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefBuilder;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -37,7 +38,7 @@ import java.util.Arrays;
|
|||
public abstract class AbstractBytesReferenceTestCase extends ESTestCase {
|
||||
|
||||
protected static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
|
||||
protected final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false);
|
||||
protected final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), CircuitBreaker.REQUEST);
|
||||
|
||||
public void testGet() throws IOException {
|
||||
int length = randomIntBetween(1, PAGE_SIZE * 3);
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.lucene.util.Accountable;
|
|||
import org.apache.lucene.util.Accountables;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -86,7 +87,7 @@ public class MockBigArrays extends BigArrays {
|
|||
}
|
||||
|
||||
private MockBigArrays(PageCacheRecycler recycler, CircuitBreakerService breakerService, boolean checkBreaker) {
|
||||
super(recycler, breakerService, checkBreaker);
|
||||
super(recycler, breakerService, CircuitBreaker.REQUEST, checkBreaker);
|
||||
this.recycler = recycler;
|
||||
this.breakerService = breakerService;
|
||||
long seed;
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.MockPageCacheRecycler;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.RunOnce;
|
||||
|
@ -113,8 +112,7 @@ public final class MockTransportService extends TransportService {
|
|||
settings = Settings.builder().put(TcpTransport.PORT.getKey(), basePort + "-" + (basePort + 100)).put(settings).build();
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
|
||||
return new MockNioTransport(settings, version, threadPool, new NetworkService(Collections.emptyList()),
|
||||
BigArrays.NON_RECYCLING_INSTANCE, new MockPageCacheRecycler(settings), namedWriteableRegistry,
|
||||
new NoneCircuitBreakerService());
|
||||
new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService());
|
||||
}
|
||||
|
||||
public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool,
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.CancellableThreads;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
|
@ -98,15 +99,14 @@ public class MockTcpTransport extends TcpTransport {
|
|||
public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
|
||||
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
|
||||
NetworkService networkService) {
|
||||
this(settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService,
|
||||
Version.CURRENT);
|
||||
this(settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService, Version.CURRENT);
|
||||
}
|
||||
|
||||
public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
|
||||
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
|
||||
NetworkService networkService, Version mockVersion) {
|
||||
super("mock-tcp-transport", settings, mockVersion, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry,
|
||||
networkService);
|
||||
super("mock-tcp-transport", settings, mockVersion, threadPool, PageCacheRecycler.NON_RECYCLING_INSTANCE, circuitBreakerService,
|
||||
namedWriteableRegistry, networkService);
|
||||
// we have our own crazy cached threadpool this one is not bounded at all...
|
||||
// using the ES thread factory here is crucial for tests otherwise disruption tests won't block that thread
|
||||
executor = Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, Transports.TEST_MOCK_TRANSPORT_THREAD_PREFIX));
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.recycler.Recycler;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.nio.BytesChannelContext;
|
||||
|
@ -69,16 +68,14 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
|
|||
public class MockNioTransport extends TcpTransport {
|
||||
private static final Logger logger = LogManager.getLogger(MockNioTransport.class);
|
||||
|
||||
private final PageCacheRecycler pageCacheRecycler;
|
||||
private final ConcurrentMap<String, MockTcpChannelFactory> profileToChannelFactory = newConcurrentMap();
|
||||
private volatile NioGroup nioGroup;
|
||||
private volatile MockTcpChannelFactory clientChannelFactory;
|
||||
|
||||
public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
|
||||
public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
|
||||
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
|
||||
CircuitBreakerService circuitBreakerService) {
|
||||
super("mock-nio", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
|
||||
this.pageCacheRecycler = pageCacheRecycler;
|
||||
super("mock-nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.plugins.NetworkPlugin;
|
||||
|
@ -39,13 +38,11 @@ public class MockNioTransportPlugin extends Plugin implements NetworkPlugin {
|
|||
public static final String MOCK_NIO_TRANSPORT_NAME = "mock-nio";
|
||||
|
||||
@Override
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
|
||||
PageCacheRecycler pageCacheRecycler,
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
|
||||
CircuitBreakerService circuitBreakerService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
NetworkService networkService) {
|
||||
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
|
||||
return Collections.singletonMap(MOCK_NIO_TRANSPORT_NAME,
|
||||
() -> new MockNioTransport(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler,
|
||||
() -> new MockNioTransport(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler,
|
||||
namedWriteableRegistry, circuitBreakerService));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.common.network.NetworkService;
|
|||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.MockPageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.node.Node;
|
||||
|
@ -57,8 +56,8 @@ public class SimpleMockNioTransportTests extends AbstractSimpleTransportTestCase
|
|||
ClusterSettings clusterSettings, boolean doHandshake) {
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||
NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||
Transport transport = new MockNioTransport(settings, version, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE,
|
||||
new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
||||
Transport transport = new MockNioTransport(settings, version, threadPool, networkService, new MockPageCacheRecycler(settings),
|
||||
namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
||||
|
||||
@Override
|
||||
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
|
||||
|
|
|
@ -16,7 +16,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
|
@ -461,7 +460,6 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||
public Map<String, Supplier<Transport>> getTransports(
|
||||
final Settings settings,
|
||||
final ThreadPool threadPool,
|
||||
final BigArrays bigArrays,
|
||||
final PageCacheRecycler pageCacheRecycler,
|
||||
final CircuitBreakerService circuitBreakerService,
|
||||
final NamedWriteableRegistry namedWriteableRegistry,
|
||||
|
@ -477,7 +475,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
return Collections.singletonMap(SecurityField.NAME4, () -> new SecurityNetty4Transport(settings, Version.CURRENT, threadPool,
|
||||
networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, sslService));
|
||||
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|||
import org.elasticsearch.common.network.CloseableChannel;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
|
@ -61,11 +61,11 @@ public class SecurityNetty4Transport extends Netty4Transport {
|
|||
final Version version,
|
||||
final ThreadPool threadPool,
|
||||
final NetworkService networkService,
|
||||
final BigArrays bigArrays,
|
||||
final PageCacheRecycler pageCacheRecycler,
|
||||
final NamedWriteableRegistry namedWriteableRegistry,
|
||||
final CircuitBreakerService circuitBreakerService,
|
||||
final SSLService sslService) {
|
||||
super(settings, version, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
|
||||
super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService);
|
||||
this.sslService = sslService;
|
||||
this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings);
|
||||
if (sslEnabled) {
|
||||
|
|
|
@ -276,15 +276,13 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
|
||||
PageCacheRecycler pageCacheRecycler,
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
|
||||
CircuitBreakerService circuitBreakerService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
NetworkService networkService) {
|
||||
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
|
||||
Map<String, Supplier<Transport>> transports = new HashMap<>();
|
||||
transports.putAll(super.getTransports(settings, threadPool, bigArrays, pageCacheRecycler,
|
||||
circuitBreakerService, namedWriteableRegistry, networkService));
|
||||
filterPlugins(NetworkPlugin.class).stream().forEach(p -> transports.putAll(p.getTransports(settings, threadPool, bigArrays,
|
||||
transports.putAll(super.getTransports(settings, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry,
|
||||
networkService));
|
||||
filterPlugins(NetworkPlugin.class).stream().forEach(p -> transports.putAll(p.getTransports(settings, threadPool,
|
||||
pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService)));
|
||||
return transports;
|
||||
|
||||
|
|
|
@ -889,20 +889,18 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
|
||||
PageCacheRecycler pageCacheRecycler,
|
||||
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
|
||||
CircuitBreakerService circuitBreakerService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
NetworkService networkService) {
|
||||
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
|
||||
if (transportClientMode || enabled == false) { // don't register anything if we are not enabled, or in transport client mode
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
Map<String, Supplier<Transport>> transports = new HashMap<>();
|
||||
transports.put(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, Version.CURRENT, threadPool,
|
||||
networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService()));
|
||||
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService()));
|
||||
transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, Version.CURRENT, threadPool,
|
||||
networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService()));
|
||||
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService()));
|
||||
|
||||
return Collections.unmodifiableMap(transports);
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport;
|
||||
|
@ -29,12 +29,12 @@ public class SecurityNetty4ServerTransport extends SecurityNetty4Transport {
|
|||
final Version version,
|
||||
final ThreadPool threadPool,
|
||||
final NetworkService networkService,
|
||||
final BigArrays bigArrays,
|
||||
final PageCacheRecycler pageCacheRecycler,
|
||||
final NamedWriteableRegistry namedWriteableRegistry,
|
||||
final CircuitBreakerService circuitBreakerService,
|
||||
@Nullable final IPFilter authenticator,
|
||||
final SSLService sslService) {
|
||||
super(settings, version, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, sslService);
|
||||
super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService);
|
||||
this.authenticator = authenticator;
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,6 @@ import org.elasticsearch.common.network.CloseableChannel;
|
|||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.recycler.Recycler;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.nio.BytesChannelContext;
|
||||
|
@ -75,10 +74,10 @@ public class SecurityNioTransport extends NioTransport {
|
|||
private final boolean sslEnabled;
|
||||
|
||||
public SecurityNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
|
||||
BigArrays bigArrays, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
|
||||
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
|
||||
CircuitBreakerService circuitBreakerService, @Nullable final IPFilter authenticator,
|
||||
SSLService sslService) {
|
||||
super(settings, version, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService);
|
||||
super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService);
|
||||
this.authenticator = authenticator;
|
||||
this.sslService = sslService;
|
||||
this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings);
|
||||
|
|
|
@ -13,7 +13,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.TestEnvironment;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
|
@ -26,7 +26,6 @@ import org.elasticsearch.xpack.core.ssl.SSLService;
|
|||
import org.junit.Before;
|
||||
|
||||
import javax.net.ssl.SSLEngine;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.util.Collections;
|
||||
import java.util.Locale;
|
||||
|
@ -72,7 +71,7 @@ public class SecurityNetty4ServerTransportTests extends ESTestCase {
|
|||
Version.CURRENT,
|
||||
mock(ThreadPool.class),
|
||||
new NetworkService(Collections.emptyList()),
|
||||
mock(BigArrays.class),
|
||||
mock(PageCacheRecycler.class),
|
||||
mock(NamedWriteableRegistry.class),
|
||||
mock(CircuitBreakerService.class),
|
||||
null,
|
||||
|
|
|
@ -12,7 +12,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -34,7 +34,7 @@ public class SimpleSecurityNetty4ServerTransportTests extends AbstractSimpleSecu
|
|||
.put(settings)
|
||||
.put("xpack.security.transport.ssl.enabled", true).build();
|
||||
Transport transport = new SecurityNetty4ServerTransport(settings1, version, threadPool,
|
||||
networkService, BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry,
|
||||
networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry,
|
||||
new NoneCircuitBreakerService(), null, createSSLService(settings1)) {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -12,7 +12,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.MockPageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
|
@ -34,9 +33,8 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleSecurityTrans
|
|||
Settings settings1 = Settings.builder()
|
||||
.put(settings)
|
||||
.put("xpack.security.transport.ssl.enabled", true).build();
|
||||
Transport transport = new SecurityNioTransport(settings1, version, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE,
|
||||
new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService(), null,
|
||||
createSSLService(settings1)) {
|
||||
Transport transport = new SecurityNioTransport(settings1, version, threadPool, networkService, new MockPageCacheRecycler(settings),
|
||||
namedWriteableRegistry, new NoneCircuitBreakerService(), null, createSSLService(settings1)) {
|
||||
|
||||
@Override
|
||||
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
|
||||
|
|
Loading…
Reference in New Issue