Implement byte array reusage in NioTransport (#27696)

This is related to #27563. This commit modifies the
InboundChannelBuffer to support releasable byte pages. These byte
pages are provided by the PageCacheRecycler. The PageCacheRecycler
must be passed to the Transport with this change.
This commit is contained in:
Tim Brooks 2017-12-08 10:39:30 -07:00 committed by GitHub
parent d21167e0c2
commit d82c40d35c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 280 additions and 91 deletions

View File

@ -43,6 +43,7 @@ import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.node.InternalSettingsPreparer;
@ -169,11 +170,12 @@ public abstract class TransportClient extends AbstractClient {
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
BigArrays bigArrays = new BigArrays(settings, circuitBreakerService);
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService);
resourcesToClose.add(bigArrays);
modules.add(settingsModule);
NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = new TransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(),

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.http.HttpServerTransport;
@ -107,6 +108,7 @@ public final class NetworkModule {
*/
public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,
BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
@ -121,9 +123,9 @@ public final class NetworkModule {
registerHttpTransport(entry.getKey(), entry.getValue());
}
}
Map<String, Supplier<Transport>> httpTransportFactory = plugin.getTransports(settings, threadPool, bigArrays,
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, bigArrays, pageCacheRecycler,
circuitBreakerService, namedWriteableRegistry, networkService);
for (Map.Entry<String, Supplier<Transport>> entry : httpTransportFactory.entrySet()) {
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
registerTransport(entry.getKey(), entry.getValue());
}
List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,

View File

@ -372,9 +372,9 @@ public class BigArrays implements Releasable {
final boolean checkBreaker;
private final BigArrays circuitBreakingInstance;
public BigArrays(Settings settings, @Nullable final CircuitBreakerService breakerService) {
public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService) {
// Checking the breaker is disabled if not specified
this(new PageCacheRecycler(settings), breakerService, false);
this(recycler, breakerService, false);
}
// public for tests

View File

@ -65,10 +65,10 @@ public class PageCacheRecycler extends AbstractComponent implements Releasable {
Releasables.close(true, bytePage, intPage, longPage, objectPage);
}
protected PageCacheRecycler(Settings settings) {
public PageCacheRecycler(Settings settings) {
super(settings);
final Type type = TYPE_SETTING .get(settings);
final long limit = LIMIT_HEAP_SETTING .get(settings).getBytes();
final Type type = TYPE_SETTING.get(settings);
final long limit = LIMIT_HEAP_SETTING.get(settings).getBytes();
final int availableProcessors = EsExecutors.numberOfProcessors(settings);
// We have a global amount of memory that we need to divide across data types.

View File

@ -80,6 +80,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryModule;
@ -363,7 +364,8 @@ public class Node implements Closeable {
modules.add(new GatewayModule());
BigArrays bigArrays = createBigArrays(settings, circuitBreakerService);
PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
resourcesToClose.add(bigArrays);
modules.add(settingsModule);
List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
@ -403,7 +405,8 @@ public class Node implements Closeable {
final RestController restController = actionModule.getRestController();
final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, restController);
threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
networkService, restController);
Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders =
pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getCustomMetaDataUpgrader)
@ -898,8 +901,16 @@ public class Node implements Closeable {
* Creates a new {@link BigArrays} instance used for this node.
* This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
*/
BigArrays createBigArrays(Settings settings, CircuitBreakerService circuitBreakerService) {
return new BigArrays(settings, circuitBreakerService);
BigArrays createBigArrays(PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) {
return new BigArrays(pageCacheRecycler, circuitBreakerService);
}
/**
* Creates a new {@link BigArrays} instance used for this node.
* This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
*/
PageCacheRecycler createPageCacheRecycler(Settings settings) {
return new PageCacheRecycler(settings);
}
/**

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.HttpServerTransport;
@ -58,6 +59,7 @@ public interface NetworkPlugin {
* See {@link org.elasticsearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation.
*/
default Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
@ -30,7 +31,7 @@ public class ReleasableBytesStreamOutputTests extends ESTestCase {
public void testRelease() throws Exception {
MockBigArrays mockBigArrays =
new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
try (ReleasableBytesStreamOutput output =
getRandomReleasableBytesStreamOutput(mockBigArrays)) {
output.writeBoolean(randomBoolean());

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.HttpInfo;
@ -133,6 +134,7 @@ public class NetworkModuleTests extends ModuleTestCase {
NetworkPlugin plugin = new NetworkPlugin() {
@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
@ -193,6 +195,7 @@ public class NetworkModuleTests extends ModuleTestCase {
NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() {
@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
@ -227,6 +230,7 @@ public class NetworkModuleTests extends ModuleTestCase {
NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() {
@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
@ -306,7 +310,7 @@ public class NetworkModuleTests extends ModuleTestCase {
}
private NetworkModule newNetworkModule(Settings settings, boolean transportClient, NetworkPlugin... plugins) {
return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, xContentRegistry(), null,
new NullDispatcher());
return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, null,
xContentRegistry(), null, new NullDispatcher());
}
}

View File

@ -42,7 +42,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public class BigArraysTests extends ESTestCase {
private BigArrays randombigArrays() {
return new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
}
private BigArrays bigArrays;

View File

@ -41,7 +41,7 @@ public class BytesRefHashTests extends ESSingleNodeTestCase {
BytesRefHash hash;
private BigArrays randombigArrays() {
return new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
}
private void newHash() {

View File

@ -36,7 +36,7 @@ public class LongHashTests extends ESSingleNodeTestCase {
LongHash hash;
private BigArrays randombigArrays() {
return new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
}
private void newHash() {

View File

@ -27,7 +27,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase;
public class LongObjectHashMapTests extends ESSingleNodeTestCase {
private BigArrays randombigArrays() {
return new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
}
public void testDuel() {

View File

@ -39,6 +39,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
@ -124,7 +125,8 @@ public class IndexModuleTests extends ESTestCase {
emptyMap(), emptyMap(), emptyMap());
threadPool = new TestThreadPool("test");
circuitBreakerService = new NoneCircuitBreakerService();
bigArrays = new BigArrays(settings, circuitBreakerService);
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService);
scriptService = new ScriptService(settings, Collections.emptyMap(), Collections.emptyMap());
clusterService = ClusterServiceUtils.createClusterService(threadPool);
nodeEnvironment = new NodeEnvironment(settings, environment);

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.cache.IndexCache;
@ -104,7 +105,7 @@ public class DefaultSearchContextTests extends ESTestCase {
when(indexService.getIndexSettings()).thenReturn(indexSettings);
when(mapperService.getIndexSettings()).thenReturn(indexSettings);
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
try (Directory dir = newDirectory();
RandomIndexWriter w = new RandomIndexWriter(random(), dir);

View File

@ -25,6 +25,7 @@ import org.apache.lucene.search.Scorer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.ESTestCase;
@ -46,7 +47,7 @@ public class MultiBucketAggregatorWrapperTests extends ESTestCase {
public void testNoNullScorerIsDelegated() throws Exception {
LeafReaderContext leafReaderContext = MemoryIndex.fromDocument(Collections.emptyList(), new MockAnalyzer(random()))
.createSearcher().getIndexReader().leaves().get(0);
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
SearchContext searchContext = mock(SearchContext.class);
when(searchContext.bigArrays()).thenReturn(bigArrays);

View File

@ -33,6 +33,7 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.BucketCollector;
@ -63,8 +64,8 @@ public class BestDocsDeferringCollectorTests extends AggregatorTestCase {
TermQuery termQuery = new TermQuery(new Term("field", String.valueOf(randomInt(maxNumValues))));
TopDocs topDocs = indexSearcher.search(termQuery, numDocs);
BestDocsDeferringCollector collector =
new BestDocsDeferringCollector(numDocs, new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()));
BestDocsDeferringCollector collector = new BestDocsDeferringCollector(numDocs,
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()));
Set<Integer> deferredCollectedDocIds = new HashSet<>();
collector.setDeferredCollector(Collections.singleton(testCollector(deferredCollectedDocIds)));
collector.preCollection();

View File

@ -38,6 +38,7 @@ import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.IpFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
@ -912,7 +913,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
dir.close();
}
InternalAggregation.ReduceContext ctx =
new InternalAggregation.ReduceContext(new MockBigArrays(Settings.EMPTY,
new InternalAggregation.ReduceContext(new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY),
new NoneCircuitBreakerService()), null, true);
for (InternalAggregation internalAgg : aggs) {
InternalAggregation mergedAggs = internalAgg.doReduce(aggs, ctx);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -60,7 +61,7 @@ public class InternalCardinalityTests extends InternalAggregationTestCase<Intern
protected InternalCardinality createTestInstance(String name,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
HyperLogLogPlusPlus hllpp = new HyperLogLogPlusPlus(p,
new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()), 1);
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), 1);
algos.add(hllpp);
for (int i = 0; i < 100; i++) {
hllpp.collect(0, BitMixer.mix64(randomIntBetween(1, 100)));
@ -107,7 +108,7 @@ public class InternalCardinalityTests extends InternalAggregationTestCase<Intern
break;
case 1:
HyperLogLogPlusPlus newState = new HyperLogLogPlusPlus(state.precision(),
new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()), 0);
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), 0);
newState.merge(0, state, 0);
int extraValues = between(10, 100);
for (int i = 0; i < extraValues; i++) {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.TestThreadPool;
@ -177,7 +178,7 @@ public class TcpTransportTests extends ESTestCase {
try {
TcpTransport transport = new TcpTransport(
"test", Settings.builder().put("transport.tcp.compress", compressed).build(), threadPool,
new BigArrays(Settings.EMPTY, null), null, null, null) {
new BigArrays(new PageCacheRecycler(Settings.EMPTY), null), null, null, null) {
@Override
protected FakeChannel bind(String name, InetSocketAddress address) throws IOException {

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@ -160,7 +161,7 @@ public class InternalMatrixStatsTests extends InternalAggregationTestCase<Intern
multiPassStats.computeStats(aValues, bValues);
ScriptService mockScriptService = mockScriptService();
MockBigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
InternalAggregation.ReduceContext context =
new InternalAggregation.ReduceContext(bigArrays, mockScriptService, true);
InternalMatrixStats reduced = (InternalMatrixStats) shardResults.get(0).reduce(shardResults, context);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.netty4.Netty4HttpServerTransport;
@ -76,6 +77,7 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin {
@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {

View File

@ -52,6 +52,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -99,7 +100,7 @@ public class Netty4HttpChannelTests extends ESTestCase {
public void setup() throws Exception {
networkService = new NetworkService(Collections.emptyList());
threadPool = new TestThreadPool("test");
bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
}
@After

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.NullDispatcher;
@ -74,7 +75,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
public void setup() throws Exception {
networkService = new NetworkService(Collections.emptyList());
threadPool = new TestThreadPool("test");
bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
}
@After

View File

@ -48,6 +48,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpServerTransport;
@ -103,7 +104,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
public void setup() throws Exception {
networkService = new NetworkService(Collections.emptyList());
threadPool = new TestThreadPool("test");
bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
}
@After

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.mocksocket.MockSocket;
import org.elasticsearch.test.ESTestCase;
@ -63,7 +64,7 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase {
public void startThreadPool() {
threadPool = new ThreadPool(settings);
NetworkService networkService = new NetworkService(Collections.emptyList());
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
nettyTransport = new Netty4Transport(settings, threadPool, networkService, bigArrays,
new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
nettyTransport.start();

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
@ -90,6 +91,7 @@ public class Netty4TransportIT extends ESNetty4IntegTestCase {
@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
@ -116,7 +117,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
}
private TcpTransport startTransport(Settings settings, ThreadPool threadPool) {
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
TcpTransport transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
transport.start();

View File

@ -70,8 +70,8 @@ public class MockBigArrays extends BigArrays {
private final PageCacheRecycler recycler;
private final CircuitBreakerService breakerService;
public MockBigArrays(Settings settings, CircuitBreakerService breakerService) {
this(new MockPageCacheRecycler(settings), breakerService, false);
public MockBigArrays(PageCacheRecycler recycler, CircuitBreakerService breakerService) {
this(recycler, breakerService, false);
}
private MockBigArrays(PageCacheRecycler recycler, CircuitBreakerService breakerService, boolean checkBreaker) {

View File

@ -57,7 +57,7 @@ public class MockPageCacheRecycler extends PageCacheRecycler {
private final Random random;
MockPageCacheRecycler(Settings settings) {
public MockPageCacheRecycler(Settings settings) {
super(settings);
// we always initialize with 0 here since we really only wanna have some random bytes / ints / longs
// and given the fact that it's called concurrently it won't reproduces anyway the same order other than in a unittest

View File

@ -30,6 +30,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@ -82,11 +84,19 @@ public class MockNode extends Node {
}
@Override
protected BigArrays createBigArrays(Settings settings, CircuitBreakerService circuitBreakerService) {
protected BigArrays createBigArrays(PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) {
if (getPluginsService().filterPlugins(NodeMocksPlugin.class).isEmpty()) {
return super.createBigArrays(settings, circuitBreakerService);
return super.createBigArrays(pageCacheRecycler, circuitBreakerService);
}
return new MockBigArrays(settings, circuitBreakerService);
return new MockBigArrays(pageCacheRecycler, circuitBreakerService);
}
@Override
PageCacheRecycler createPageCacheRecycler(Settings settings) {
if (getPluginsService().filterPlugins(NodeMocksPlugin.class).isEmpty()) {
return super.createPageCacheRecycler(settings);
}
return new MockPageCacheRecycler(settings);
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
@ -112,7 +113,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
when(searchContext.aggregations())
.thenReturn(new SearchContextAggregations(AggregatorFactories.EMPTY, bucketConsumer));
when(searchContext.bigArrays()).thenReturn(new MockBigArrays(Settings.EMPTY, circuitBreakerService));
when(searchContext.bigArrays()).thenReturn(new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), circuitBreakerService));
// TODO: now just needed for top_hits, this will need to be revised for other agg unit tests:
MapperService mapperService = mapperServiceMock();
when(mapperService.getIndexSettings()).thenReturn(indexSettings);

View File

@ -2076,7 +2076,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
try {
INSTANCE.printTestMessage("cleaning up after");
INSTANCE.afterInternal(true);
checkStaticState();
checkStaticState(true);
} finally {
INSTANCE = null;
}

View File

@ -288,7 +288,7 @@ public abstract class ESTestCase extends LuceneTestCase {
@After
public final void after() throws Exception {
checkStaticState();
checkStaticState(false);
// We check threadContext != null rather than enableWarningsCheck()
// because after methods are still called in the event that before
// methods failed, in which case threadContext might not have been
@ -394,8 +394,10 @@ public abstract class ESTestCase extends LuceneTestCase {
}
// separate method so that this can be checked again after suite scoped cluster is shut down
protected static void checkStaticState() throws Exception {
MockPageCacheRecycler.ensureAllPagesAreReleased();
protected static void checkStaticState(boolean afterClass) throws Exception {
if (afterClass) {
MockPageCacheRecycler.ensureAllPagesAreReleased();
}
MockBigArrays.ensureAllArraysAreReleased();
// ensure no one changed the status logger level on us

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
@ -236,7 +237,7 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
toReduce.add(t);
}
ScriptService mockScriptService = mockScriptService();
MockBigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
if (randomBoolean() && toReduce.size() > 1) {
// sometimes do an incremental reduce
Collections.shuffle(toReduce, random());

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
@ -39,6 +40,7 @@ public class MockTcpTransportPlugin extends Plugin implements NetworkPlugin {
@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {

View File

@ -19,9 +19,16 @@
package org.elasticsearch.transport.nio;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.BigArrays;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
/**
@ -30,39 +37,56 @@ import java.util.function.Supplier;
* the pages internally. If more space is needed at the end of the buffer {@link #ensureCapacity(long)} can
* be called and the buffer will expand using the supplier provided.
*/
public final class InboundChannelBuffer {
public final class InboundChannelBuffer implements Releasable {
private static final int PAGE_SIZE = 1 << 14;
private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
private static final int PAGE_MASK = PAGE_SIZE - 1;
private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE);
private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
private final ArrayDeque<ByteBuffer> pages;
private final Supplier<ByteBuffer> pageSupplier;
private final ArrayDeque<Page> pages;
private final Supplier<Page> pageSupplier;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private long capacity = 0;
private long internalIndex = 0;
// The offset is an int as it is the offset of where the bytes begin in the first buffer
private int offset = 0;
public InboundChannelBuffer() {
this(() -> ByteBuffer.wrap(new byte[PAGE_SIZE]));
}
private InboundChannelBuffer(Supplier<ByteBuffer> pageSupplier) {
public InboundChannelBuffer(Supplier<Page> pageSupplier) {
this.pageSupplier = pageSupplier;
this.pages = new ArrayDeque<>();
this.capacity = PAGE_SIZE * pages.size();
ensureCapacity(PAGE_SIZE);
}
@Override
public void close() {
if (isClosed.compareAndSet(false, true)) {
Page page;
List<RuntimeException> closingExceptions = new ArrayList<>();
while ((page = pages.pollFirst()) != null) {
try {
page.close();
} catch (RuntimeException e) {
closingExceptions.add(e);
}
}
ExceptionsHelper.rethrowAndSuppress(closingExceptions);
}
}
public void ensureCapacity(long requiredCapacity) {
if (isClosed.get()) {
throw new IllegalStateException("Cannot allocate new pages if the buffer is closed.");
}
if (capacity < requiredCapacity) {
int numPages = numPages(requiredCapacity + offset);
int pagesToAdd = numPages - pages.size();
for (int i = 0; i < pagesToAdd; i++) {
pages.addLast(pageSupplier.get());
Page page = pageSupplier.get();
pages.addLast(page);
}
capacity += pagesToAdd * PAGE_SIZE;
}
@ -81,7 +105,7 @@ public final class InboundChannelBuffer {
int pagesToRelease = pageIndex(offset + bytesToRelease);
for (int i = 0; i < pagesToRelease; i++) {
pages.removeFirst();
pages.removeFirst().close();
}
capacity -= bytesToRelease;
internalIndex = Math.max(internalIndex - bytesToRelease, 0);
@ -112,12 +136,12 @@ public final class InboundChannelBuffer {
}
ByteBuffer[] buffers = new ByteBuffer[pageCount];
Iterator<ByteBuffer> pageIterator = pages.iterator();
ByteBuffer firstBuffer = pageIterator.next().duplicate();
Iterator<Page> pageIterator = pages.iterator();
ByteBuffer firstBuffer = pageIterator.next().byteBuffer.duplicate();
firstBuffer.position(firstBuffer.position() + offset);
buffers[0] = firstBuffer;
for (int i = 1; i < buffers.length; i++) {
buffers[i] = pageIterator.next().duplicate();
buffers[i] = pageIterator.next().byteBuffer.duplicate();
}
if (finalLimit != 0) {
buffers[buffers.length - 1].limit(finalLimit);
@ -148,11 +172,11 @@ public final class InboundChannelBuffer {
int indexInPage = indexInPage(indexWithOffset);
ByteBuffer[] buffers = new ByteBuffer[pages.size() - pageIndex];
Iterator<ByteBuffer> pageIterator = pages.descendingIterator();
Iterator<Page> pageIterator = pages.descendingIterator();
for (int i = buffers.length - 1; i > 0; --i) {
buffers[i] = pageIterator.next().duplicate();
buffers[i] = pageIterator.next().byteBuffer.duplicate();
}
ByteBuffer firstPostIndexBuffer = pageIterator.next().duplicate();
ByteBuffer firstPostIndexBuffer = pageIterator.next().byteBuffer.duplicate();
firstPostIndexBuffer.position(firstPostIndexBuffer.position() + indexInPage);
buffers[0] = firstPostIndexBuffer;
@ -201,4 +225,21 @@ public final class InboundChannelBuffer {
private int indexInPage(long index) {
return (int) (index & PAGE_MASK);
}
public static class Page implements Releasable {
private final ByteBuffer byteBuffer;
private final Releasable releasable;
public Page(ByteBuffer byteBuffer, Releasable releasable) {
this.byteBuffer = byteBuffer;
this.releasable = releasable;
}
@Override
public void close() {
releasable.close();
}
}
}

View File

@ -24,10 +24,12 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
@ -44,6 +46,7 @@ import org.elasticsearch.transport.nio.channel.TcpWriteContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;
@ -68,6 +71,7 @@ public class NioTransport extends TcpTransport {
intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope);
private final OpenChannels openChannels = new OpenChannels(logger);
private final PageCacheRecycler pageCacheRecycler;
private final ConcurrentMap<String, TcpChannelFactory> profileToChannelFactory = newConcurrentMap();
private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>();
private final ArrayList<SocketSelector> socketSelectors = new ArrayList<>();
@ -76,8 +80,10 @@ public class NioTransport extends TcpTransport {
private int acceptorNumber;
public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
this.pageCacheRecycler = pageCacheRecycler;
}
@Override
@ -184,8 +190,14 @@ public class NioTransport extends TcpTransport {
}
private Consumer<NioSocketChannel> getContextSetter(String profileName) {
return (c) -> c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName,this)), new TcpWriteContext(c),
this::exceptionCaught);
return (c) -> {
Supplier<InboundChannelBuffer.Page> pageSupplier = () -> {
Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes);
};
c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName, this), new InboundChannelBuffer(pageSupplier)),
new TcpWriteContext(c), this::exceptionCaught);
};
}
private void acceptChannel(NioSocketChannel channel) {

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
@ -38,6 +39,7 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin {
@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
@ -49,6 +51,7 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin {
settings1 = settings;
}
return Collections.singletonMap(NIO_TRANSPORT_NAME,
() -> new NioTransport(settings1, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService));
() -> new NioTransport(settings1, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry,
circuitBreakerService));
}
}

View File

@ -56,6 +56,7 @@ public class NioSocketChannel extends AbstractNioChannel<SocketChannel> {
if (writeContext.hasQueuedWriteOps()) {
writeContext.clearQueuedWriteOps(new ClosedChannelException());
}
readContext.close();
super.closeFromSelector();
}

View File

@ -21,8 +21,11 @@ package org.elasticsearch.transport.nio.channel;
import java.io.IOException;
public interface ReadContext {
public interface ReadContext extends AutoCloseable {
int read() throws IOException;
@Override
void close();
}

View File

@ -32,17 +32,13 @@ public class TcpReadContext implements ReadContext {
private final TcpReadHandler handler;
private final TcpNioSocketChannel channel;
private final TcpFrameDecoder frameDecoder;
private final InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
private final InboundChannelBuffer channelBuffer;
private final TcpFrameDecoder frameDecoder = new TcpFrameDecoder();
public TcpReadContext(NioSocketChannel channel, TcpReadHandler handler) {
this((TcpNioSocketChannel) channel, handler, new TcpFrameDecoder());
}
public TcpReadContext(TcpNioSocketChannel channel, TcpReadHandler handler, TcpFrameDecoder frameDecoder) {
public TcpReadContext(NioSocketChannel channel, TcpReadHandler handler, InboundChannelBuffer channelBuffer) {
this.handler = handler;
this.channel = channel;
this.frameDecoder = frameDecoder;
this.channel = (TcpNioSocketChannel) channel;
this.channelBuffer = channelBuffer;
}
@Override
@ -82,6 +78,11 @@ public class TcpReadContext implements ReadContext {
return bytesRead;
}
@Override
public void close() {
channelBuffer.close();
}
private static BytesReference toBytesReference(InboundChannelBuffer channelBuffer) {
ByteBuffer[] writtenToBuffers = channelBuffer.sliceBuffersTo(channelBuffer.getIndex());
ByteBufferReference[] references = new ByteBufferReference[writtenToBuffers.length];
@ -91,5 +92,4 @@ public class TcpReadContext implements ReadContext {
return new CompositeBytesReference(references);
}
}

View File

@ -19,16 +19,22 @@
package org.elasticsearch.transport.nio;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.test.ESTestCase;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
public class InboundChannelBufferTests extends ESTestCase {
private static final int PAGE_SIZE = 1 << 14;
private static final int PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES;
private final Supplier<InboundChannelBuffer.Page> defaultPageSupplier = () ->
new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {});
public void testNewBufferHasSinglePage() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
assertEquals(PAGE_SIZE, channelBuffer.getCapacity());
assertEquals(PAGE_SIZE, channelBuffer.getRemaining());
@ -36,7 +42,7 @@ public class InboundChannelBufferTests extends ESTestCase {
}
public void testExpandCapacity() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
assertEquals(PAGE_SIZE, channelBuffer.getCapacity());
assertEquals(PAGE_SIZE, channelBuffer.getRemaining());
@ -48,7 +54,7 @@ public class InboundChannelBufferTests extends ESTestCase {
}
public void testExpandCapacityMultiplePages() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
assertEquals(PAGE_SIZE, channelBuffer.getCapacity());
@ -60,7 +66,7 @@ public class InboundChannelBufferTests extends ESTestCase {
}
public void testExpandCapacityRespectsOffset() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
assertEquals(PAGE_SIZE, channelBuffer.getCapacity());
assertEquals(PAGE_SIZE, channelBuffer.getRemaining());
@ -79,7 +85,7 @@ public class InboundChannelBufferTests extends ESTestCase {
}
public void testIncrementIndex() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
assertEquals(0, channelBuffer.getIndex());
assertEquals(PAGE_SIZE, channelBuffer.getRemaining());
@ -91,7 +97,7 @@ public class InboundChannelBufferTests extends ESTestCase {
}
public void testIncrementIndexWithOffset() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
assertEquals(0, channelBuffer.getIndex());
assertEquals(PAGE_SIZE, channelBuffer.getRemaining());
@ -109,8 +115,60 @@ public class InboundChannelBufferTests extends ESTestCase {
assertEquals(PAGE_SIZE - 20, channelBuffer.getRemaining());
}
public void testReleaseClosesPages() {
ConcurrentLinkedQueue<AtomicBoolean> queue = new ConcurrentLinkedQueue<>();
Supplier<InboundChannelBuffer.Page> supplier = () -> {
AtomicBoolean atomicBoolean = new AtomicBoolean();
queue.add(atomicBoolean);
return new InboundChannelBuffer.Page(ByteBuffer.allocate(PAGE_SIZE), () -> atomicBoolean.set(true));
};
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(supplier);
channelBuffer.ensureCapacity(PAGE_SIZE * 4);
assertEquals(PAGE_SIZE * 4, channelBuffer.getCapacity());
assertEquals(4, queue.size());
for (AtomicBoolean closedRef : queue) {
assertFalse(closedRef.get());
}
channelBuffer.release(2 * PAGE_SIZE);
assertEquals(PAGE_SIZE * 2, channelBuffer.getCapacity());
assertTrue(queue.poll().get());
assertTrue(queue.poll().get());
assertFalse(queue.poll().get());
assertFalse(queue.poll().get());
}
public void testClose() {
ConcurrentLinkedQueue<AtomicBoolean> queue = new ConcurrentLinkedQueue<>();
Supplier<InboundChannelBuffer.Page> supplier = () -> {
AtomicBoolean atomicBoolean = new AtomicBoolean();
queue.add(atomicBoolean);
return new InboundChannelBuffer.Page(ByteBuffer.allocate(PAGE_SIZE), () -> atomicBoolean.set(true));
};
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(supplier);
channelBuffer.ensureCapacity(PAGE_SIZE * 4);
assertEquals(4, queue.size());
for (AtomicBoolean closedRef : queue) {
assertFalse(closedRef.get());
}
channelBuffer.close();
for (AtomicBoolean closedRef : queue) {
assertTrue(closedRef.get());
}
expectThrows(IllegalStateException.class, () -> channelBuffer.ensureCapacity(1));
}
public void testAccessByteBuffers() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
int pages = randomInt(50) + 5;
channelBuffer.ensureCapacity(pages * PAGE_SIZE);

View File

@ -28,6 +28,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.transport.MockTransportService;
@ -57,8 +59,8 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
NetworkService networkService = new NetworkService(Collections.emptyList());
Transport transport = new NioTransport(settings, threadPool,
networkService,
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
networkService, BigArrays.NON_RECYCLING_INSTANCE, new MockPageCacheRecycler(settings), namedWriteableRegistry,
new NoneCircuitBreakerService()) {
@Override
protected Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException,

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport.nio.channel;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.InboundChannelBuffer;
import org.elasticsearch.transport.nio.TcpReadHandler;
@ -30,6 +31,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
@ -45,12 +47,14 @@ public class TcpReadContextTests extends ESTestCase {
private TcpReadContext readContext;
@Before
public void init() throws IOException {
public void init() {
handler = mock(TcpReadHandler.class);
messageLength = randomInt(96) + 4;
channel = mock(TcpNioSocketChannel.class);
readContext = new TcpReadContext(channel, handler);
Supplier<InboundChannelBuffer.Page> pageSupplier = () ->
new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {});
readContext = new TcpReadContext(channel, handler, new InboundChannelBuffer(pageSupplier));
}
public void testSuccessfulRead() throws IOException {
@ -122,6 +126,15 @@ public class TcpReadContextTests extends ESTestCase {
}
}
public void closeClosesChannelBuffer() {
InboundChannelBuffer buffer = mock(InboundChannelBuffer.class);
TcpReadContext readContext = new TcpReadContext(channel, handler, buffer);
readContext.close();
verify(buffer).close();
}
private static byte[] combineMessageAndHeader(byte[] bytes) {
return combineMessageAndHeader(bytes, bytes.length);
}