Move netty transport and http into a module

This moves all netty code and it's dependency into a module.
This commit is contained in:
Simon Willnauer 2016-07-11 22:00:45 +02:00
parent 47bd2f9ca5
commit 048e4416e7
57 changed files with 384 additions and 197 deletions

View File

@ -853,7 +853,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]ReplicaShardAllocatorTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]ReusePeerRecoverySharedTest.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]get[/\\]GetActionIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]netty[/\\]NettyHttpServerPipeliningTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexModuleTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexServiceTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexWithShadowReplicasIT.java" checks="LineLength" />

View File

@ -74,8 +74,6 @@ dependencies {
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}"
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
// network stack
compile 'io.netty:netty:3.10.6.Final'
// percentiles aggregation
compile 'com.tdunning:t-digest:3.0'
// precentil ranks aggregation
@ -152,26 +150,11 @@ processResources {
}
thirdPartyAudit.excludes = [
// uses internal java api: sun.security.x509 (X509CertInfo, X509CertImpl, X500Name)
'org.jboss.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator',
// classes are missing!
// from com.fasterxml.jackson.dataformat.yaml.YAMLMapper (jackson-dataformat-yaml)
'com.fasterxml.jackson.databind.ObjectMapper',
// from org.jboss.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder (netty)
'com.google.protobuf.CodedInputStream',
// from org.jboss.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender (netty)
'com.google.protobuf.CodedOutputStream',
// from org.jboss.netty.handler.codec.protobuf.ProtobufDecoder (netty)
'com.google.protobuf.ExtensionRegistry',
'com.google.protobuf.MessageLite$Builder',
'com.google.protobuf.MessageLite',
'com.google.protobuf.Parser',
// from org.apache.log4j.receivers.net.JMSReceiver (log4j-extras)
'javax.jms.Message',
'javax.jms.MessageListener',
@ -196,72 +179,8 @@ thirdPartyAudit.excludes = [
'javax.mail.internet.MimeMessage',
'javax.mail.internet.MimeMultipart',
'javax.mail.internet.MimeUtility',
// from org.jboss.netty.channel.socket.http.HttpTunnelingServlet (netty)
'javax.servlet.ServletConfig',
'javax.servlet.ServletException',
'javax.servlet.ServletOutputStream',
'javax.servlet.http.HttpServlet',
'javax.servlet.http.HttpServletRequest',
'javax.servlet.http.HttpServletResponse',
// from org.jboss.netty.logging.CommonsLoggerFactory (netty)
'org.apache.commons.logging.Log',
'org.apache.commons.logging.LogFactory',
// from org.jboss.netty.handler.ssl.OpenSslEngine (netty)
'org.apache.tomcat.jni.Buffer',
'org.apache.tomcat.jni.Library',
'org.apache.tomcat.jni.Pool',
'org.apache.tomcat.jni.SSL',
'org.apache.tomcat.jni.SSLContext',
// from org.jboss.netty.handler.ssl.util.BouncyCastleSelfSignedCertGenerator (netty)
'org.bouncycastle.asn1.x500.X500Name',
'org.bouncycastle.cert.X509v3CertificateBuilder',
'org.bouncycastle.cert.jcajce.JcaX509CertificateConverter',
'org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder',
'org.bouncycastle.jce.provider.BouncyCastleProvider',
'org.bouncycastle.operator.jcajce.JcaContentSignerBuilder',
// from org.jboss.netty.handler.ssl.JettyNpnSslEngine (netty)
'org.eclipse.jetty.npn.NextProtoNego$ClientProvider',
'org.eclipse.jetty.npn.NextProtoNego$ServerProvider',
'org.eclipse.jetty.npn.NextProtoNego',
// from org.jboss.netty.logging.JBossLoggerFactory (netty)
'org.jboss.logging.Logger',
// from org.jboss.netty.handler.codec.marshalling.ChannelBufferByteInput (netty)
'org.jboss.marshalling.ByteInput',
// from org.jboss.netty.handler.codec.marshalling.ChannelBufferByteOutput (netty)
'org.jboss.marshalling.ByteOutput',
// from org.jboss.netty.handler.codec.marshalling.CompatibleMarshallingEncoder (netty)
'org.jboss.marshalling.Marshaller',
// from org.jboss.netty.handler.codec.marshalling.ContextBoundUnmarshallerProvider (netty)
'org.jboss.marshalling.MarshallerFactory',
'org.jboss.marshalling.MarshallingConfiguration',
'org.jboss.marshalling.Unmarshaller',
// from org.locationtech.spatial4j.io.GeoJSONReader (spatial4j)
'org.noggit.JSONParser',
// from org.jboss.netty.container.osgi.NettyBundleActivator (netty)
'org.osgi.framework.BundleActivator',
'org.osgi.framework.BundleContext',
// from org.jboss.netty.logging.OsgiLoggerFactory$1 (netty)
'org.osgi.framework.ServiceReference',
'org.osgi.service.log.LogService',
'org.osgi.util.tracker.ServiceTracker',
'org.osgi.util.tracker.ServiceTrackerCustomizer',
// from org.netty.util.internal.logging.InternalLoggerFactory (netty) - it's optional
'org.slf4j.Logger',
'org.slf4j.LoggerFactory',
]
// dependency license are currently checked in distribution

View File

@ -19,11 +19,9 @@
package org.elasticsearch.common.compress;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;

View File

@ -20,8 +20,6 @@
package org.elasticsearch.common.network;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.client.transport.support.TransportProxyClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
@ -41,13 +39,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.ExtensionPoint;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.netty.NettyHttpServerTransport;
import org.elasticsearch.tasks.RawTaskStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.local.LocalTransport;
import org.elasticsearch.transport.netty.NettyTransport;
/**
* A module to handle registering and binding all network related classes.
@ -91,14 +87,9 @@ public class NetworkModule extends AbstractModule {
this.namedWriteableRegistry = namedWriteableRegistry;
registerTransportService(NETTY_TRANSPORT, TransportService.class);
registerTransport(LOCAL_TRANSPORT, LocalTransport.class);
registerTransport(NETTY_TRANSPORT, NettyTransport.class);
registerTaskStatus(ReplicationTask.Status.NAME, ReplicationTask.Status::new);
registerTaskStatus(RawTaskStatus.NAME, RawTaskStatus::new);
registerBuiltinAllocationCommands();
if (transportClient == false) {
registerHttpTransport(NETTY_TRANSPORT, NettyHttpServerTransport.class);
}
}
public boolean isTransportClient() {
@ -185,4 +176,8 @@ public class NetworkModule extends AbstractModule {
AllocateStalePrimaryAllocationCommand.COMMAND_NAME_FIELD);
}
public boolean canRegisterHttpExtensions() {
return transportClient == false;
}
}

View File

@ -61,7 +61,6 @@ import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.http.netty.NettyHttpServerTransport;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.IndexingMemoryController;
@ -92,7 +91,6 @@ import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.netty.NettyTransport;
import org.elasticsearch.tribe.TribeService;
import org.elasticsearch.watcher.ResourceWatcherService;
@ -241,18 +239,6 @@ public final class ClusterSettings extends AbstractScopedSettings {
HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE,
HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH,
HttpTransportSettings.SETTING_HTTP_RESET_COOKIES,
NettyHttpServerTransport.SETTING_HTTP_NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
NettyHttpServerTransport.SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
NettyHttpServerTransport.SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE,
NettyHttpServerTransport.SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_MIN,
NettyHttpServerTransport.SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_MAX,
NettyHttpServerTransport.SETTING_HTTP_WORKER_COUNT,
NettyHttpServerTransport.SETTING_HTTP_TCP_NO_DELAY,
NettyHttpServerTransport.SETTING_HTTP_TCP_KEEP_ALIVE,
NettyHttpServerTransport.SETTING_HTTP_TCP_BLOCKING_SERVER,
NettyHttpServerTransport.SETTING_HTTP_TCP_REUSE_ADDRESS,
NettyHttpServerTransport.SETTING_HTTP_TCP_SEND_BUFFER_SIZE,
NettyHttpServerTransport.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE,
HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING,
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING,
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,
@ -278,7 +264,6 @@ public final class ClusterSettings extends AbstractScopedSettings {
TransportSettings.BIND_HOST,
TransportSettings.PUBLISH_PORT,
TransportSettings.PORT,
NettyTransport.WORKER_COUNT,
TcpTransport.CONNECTIONS_PER_NODE_RECOVERY,
TcpTransport.CONNECTIONS_PER_NODE_BULK,
TcpTransport.CONNECTIONS_PER_NODE_REG,
@ -287,13 +272,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
TcpTransport.PING_SCHEDULE,
TcpTransport.TCP_BLOCKING_CLIENT,
TcpTransport.TCP_CONNECT_TIMEOUT,
NettyTransport.NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
NettyTransport.NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
NettyTransport.NETTY_RECEIVE_PREDICTOR_SIZE,
NettyTransport.NETTY_RECEIVE_PREDICTOR_MIN,
NettyTransport.NETTY_RECEIVE_PREDICTOR_MAX,
NetworkService.NETWORK_SERVER,
NettyTransport.NETTY_BOSS_COUNT,
TcpTransport.TCP_NO_DELAY,
TcpTransport.TCP_KEEP_ALIVE,
TcpTransport.TCP_REUSE_ADDRESS,

View File

@ -39,10 +39,10 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.netty.NettyTransport;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentMap;
@ -198,13 +198,8 @@ public class UnicastZenPingIT extends ESTestCase {
private NetworkHandle startServices(Settings settings, ThreadPool threadPool, NetworkService networkService, String nodeId,
Version version) {
NettyTransport transport = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(), new NoneCircuitBreakerService()) {
@Override
protected Version getCurrentVersion() {
return version;
}
};
MockTcpTransport transport = new MockTcpTransport("mock", settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), new NamedWriteableRegistry(), networkService, version);
final TransportService transportService = new TransportService(settings, transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -20,7 +20,6 @@
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
@ -32,7 +31,6 @@ import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty.NettyTransport;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -45,27 +43,27 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsString;
public class NettyTransportServiceHandshakeTests extends ESTestCase {
public class TransportServiceHandshakeTests extends ESTestCase {
private static ThreadPool threadPool;
private static final long timeout = Long.MAX_VALUE;
@BeforeClass
public static void startThreadPool() {
threadPool = new TestThreadPool(NettyTransportServiceHandshakeTests.class.getSimpleName());
threadPool = new TestThreadPool(TransportServiceHandshakeTests.class.getSimpleName());
}
private List<TransportService> transportServices = new ArrayList<>();
private NetworkHandle startServices(String nodeNameAndId, Settings settings, Version version) {
NettyTransport transport =
new NettyTransport(
MockTcpTransport transport =
new MockTcpTransport("mock",
settings,
threadPool,
new NetworkService(settings),
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(),
new NoneCircuitBreakerService());
new NetworkService(settings));
TransportService transportService = new MockTransportService(settings, transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -49,7 +49,11 @@ ext.dependencyFiles = project(':core').configurations.runtime.copyRecursive()
task buildModules(type: Sync) {
into 'build/modules'
}
}
task buildTransport(type: Sync) {
into 'build/modules'
}
ext.restTestExpansions = [
'expected.modules.count': 0,
@ -82,6 +86,20 @@ project.rootProject.subprojects.findAll { it.path.startsWith(':modules:') }.each
restTestExpansions['expected.modules.count'] += 1
}
project.rootProject.subprojects.findAll { it.path.startsWith(':modules:transport-') }.each { Project module ->
buildTransport {
dependsOn({ project(module.path).bundlePlugin })
into(module.name) {
from { zipTree(project(module.path).bundlePlugin.outputs.files.singleFile) }
}
}
module.afterEvaluate({
module.integTest.mustRunAfter(':distribution:integ-test-zip:integTest#stop')
})
restTestExpansions['expected.modules.count'] += 1
}
// make sure we have a clean task since we aren't a java project, but we have tasks that
// put stuff in the build dir
task clean(type: Delete) {
@ -145,6 +163,10 @@ subprojects {
from project(':distribution').buildModules
}
transportFiles = copySpec {
into 'modules'
from project(':distribution').buildTransport
}
configFiles = copySpec {
from '../src/main/resources/config'
MavenFilteringHack.filter(it, expansions)
@ -209,6 +231,8 @@ configure(subprojects.findAll { ['zip', 'tar', 'integ-test-zip'].contains(it.nam
}
if (project.name != 'integ-test-zip') {
with modulesFiles
} else {
with transportFiles
}
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
TODOs:
* fix permissions such that only netty can open sockets etc?
* fix the hack in the build framework that copies transport-netty into the integ test cluster
* maybe figure out a way to run all tests from core with netty/network?
*/
esplugin {
description 'Netty 3 based transport implementation'
classname 'org.elasticsearch.transport.NettyPlugin'
}
compileTestJava.options.compilerArgs << "-Xlint:-cast,-deprecation,-rawtypes,-try,-unchecked"
dependencies {
// network stack
compile 'io.netty:netty:3.10.6.Final'
}
thirdPartyAudit.excludes = [
// uses internal java api: sun.security.x509 (X509CertInfo, X509CertImpl, X500Name)
'org.jboss.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator',
// classes are missing
// from org.jboss.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder (netty)
'com.google.protobuf.CodedInputStream',
// from org.jboss.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender (netty)
'com.google.protobuf.CodedOutputStream',
// from org.jboss.netty.handler.codec.protobuf.ProtobufDecoder (netty)
'com.google.protobuf.ExtensionRegistry',
'com.google.protobuf.MessageLite$Builder',
'com.google.protobuf.MessageLite',
'com.google.protobuf.Parser',
// from org.jboss.netty.channel.socket.http.HttpTunnelingServlet (netty)
'javax.servlet.ServletConfig',
'javax.servlet.ServletException',
'javax.servlet.ServletOutputStream',
'javax.servlet.http.HttpServlet',
'javax.servlet.http.HttpServletRequest',
'javax.servlet.http.HttpServletResponse',
// from org.jboss.netty.logging.CommonsLoggerFactory (netty)
'org.apache.commons.logging.Log',
'org.apache.commons.logging.LogFactory',
// from org.jboss.netty.handler.ssl.OpenSslEngine (netty)
'org.apache.tomcat.jni.Buffer',
'org.apache.tomcat.jni.Library',
'org.apache.tomcat.jni.Pool',
'org.apache.tomcat.jni.SSL',
'org.apache.tomcat.jni.SSLContext',
// from org.jboss.netty.handler.ssl.util.BouncyCastleSelfSignedCertGenerator (netty)
'org.bouncycastle.asn1.x500.X500Name',
'org.bouncycastle.cert.X509v3CertificateBuilder',
'org.bouncycastle.cert.jcajce.JcaX509CertificateConverter',
'org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder',
'org.bouncycastle.jce.provider.BouncyCastleProvider',
'org.bouncycastle.operator.jcajce.JcaContentSignerBuilder',
// from org.jboss.netty.handler.ssl.JettyNpnSslEngine (netty)
'org.eclipse.jetty.npn.NextProtoNego$ClientProvider',
'org.eclipse.jetty.npn.NextProtoNego$ServerProvider',
'org.eclipse.jetty.npn.NextProtoNego',
// from org.jboss.netty.logging.JBossLoggerFactory (netty)
'org.jboss.logging.Logger',
// from org.jboss.netty.handler.codec.marshalling.ChannelBufferByteInput (netty)
'org.jboss.marshalling.ByteInput',
// from org.jboss.netty.handler.codec.marshalling.ChannelBufferByteOutput (netty)
'org.jboss.marshalling.ByteOutput',
// from org.jboss.netty.handler.codec.marshalling.CompatibleMarshallingEncoder (netty)
'org.jboss.marshalling.Marshaller',
// from org.jboss.netty.handler.codec.marshalling.ContextBoundUnmarshallerProvider (netty)
'org.jboss.marshalling.MarshallerFactory',
'org.jboss.marshalling.MarshallingConfiguration',
'org.jboss.marshalling.Unmarshaller',
// from org.jboss.netty.container.osgi.NettyBundleActivator (netty)
'org.osgi.framework.BundleActivator',
'org.osgi.framework.BundleContext',
// from org.jboss.netty.logging.OsgiLoggerFactory$1 (netty)
'org.osgi.framework.ServiceReference',
'org.osgi.service.log.LogService',
'org.osgi.util.tracker.ServiceTracker',
'org.osgi.util.tracker.ServiceTrackerCustomizer',
// from org.netty.util.internal.logging.InternalLoggerFactory (netty) - it's optional
'org.slf4j.Logger',
'org.slf4j.LoggerFactory',
]

View File

@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.netty.NettyHttpServerTransport;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.netty.NettyTransport;
import java.util.Arrays;
import java.util.List;
public class NettyPlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(
NettyHttpServerTransport.SETTING_HTTP_NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
NettyHttpServerTransport.SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
NettyHttpServerTransport.SETTING_HTTP_WORKER_COUNT,
NettyHttpServerTransport.SETTING_HTTP_TCP_NO_DELAY,
NettyHttpServerTransport.SETTING_HTTP_TCP_KEEP_ALIVE,
NettyHttpServerTransport.SETTING_HTTP_TCP_BLOCKING_SERVER,
NettyHttpServerTransport.SETTING_HTTP_TCP_REUSE_ADDRESS,
NettyHttpServerTransport.SETTING_HTTP_TCP_SEND_BUFFER_SIZE,
NettyHttpServerTransport.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE,
NettyTransport.WORKER_COUNT,
NettyTransport.NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
NettyTransport.NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
NettyTransport.NETTY_RECEIVE_PREDICTOR_SIZE,
NettyTransport.NETTY_RECEIVE_PREDICTOR_MIN,
NettyTransport.NETTY_RECEIVE_PREDICTOR_MAX,
NettyTransport.NETTY_BOSS_COUNT
);
}
public void onModule(NetworkModule networkModule) {
if (networkModule.canRegisterHttpExtensions()) {
networkModule.registerHttpTransport("netty", NettyHttpServerTransport.class);
}
networkModule.registerTransport("netty", NettyTransport.class);
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
grant {
// TODO add netty specific permissions
};

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.NettyPlugin;
import org.elasticsearch.transport.netty.NettyTransport;
import java.util.Collection;
@ESIntegTestCase.SuppressLocalMode
public abstract class ESNettyIntegTestCase extends ESIntegTestCase {
@Override
protected boolean ignoreExternalCluster() {
return true;
}
@Override
protected boolean addMockTransportService() {
return false;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal));
// randomize netty settings
if (randomBoolean()) {
builder.put(NettyTransport.WORKER_COUNT.getKey(), random().nextInt(3) + 1);
}
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty");
builder.put(NetworkModule.HTTP_TYPE_KEY, "netty");
return builder.build();
}
@Override
protected Settings transportClientSettings() {
Settings.Builder builder = Settings.builder().put(super.transportClientSettings());
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty");
return builder.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(NettyPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return pluginList(NettyPlugin.class);
}
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.http.netty;
import org.elasticsearch.ESNettyIntegTestCase;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
@ -45,7 +46,7 @@ import static org.hamcrest.Matchers.hasSize;
* a single node "cluster". We also force test infrastructure to use the node client instead of the transport client for the same reason.
*/
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numClientNodes = 0, numDataNodes = 1, transportClientRatio = 0)
public class NettyHttpRequestSizeLimitIT extends ESIntegTestCase {
public class NettyHttpRequestSizeLimitIT extends ESNettyIntegTestCase {
private static final ByteSizeValue LIMIT = new ByteSizeValue(2, ByteSizeUnit.KB);
@Override

View File

@ -95,7 +95,8 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
.build();
httpServerTransport = new CustomNettyHttpServerTransport(settings);
httpServerTransport.start();
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress()
.boundAddresses());
List<String> requests = Arrays.asList("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast");
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
@ -112,7 +113,8 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
.build();
httpServerTransport = new CustomNettyHttpServerTransport(settings);
httpServerTransport.start();
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress()
.boundAddresses());
List<String> requests = Arrays.asList("/slow?sleep=1000", "/firstfast", "/secondfast", "/thirdfast", "/slow?sleep=500");
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
@ -138,7 +140,8 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
@Override
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
return new CustomHttpChannelPipelineFactory(this, executorService, NettyHttpServerPipeliningTests.this.threadPool.getThreadContext());
return new CustomHttpChannelPipelineFactory(this, executorService, NettyHttpServerPipeliningTests.this.threadPool
.getThreadContext());
}
@Override
@ -152,7 +155,8 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
private final ExecutorService executorService;
public CustomHttpChannelPipelineFactory(NettyHttpServerTransport transport, ExecutorService executorService, ThreadContext threadContext) {
public CustomHttpChannelPipelineFactory(NettyHttpServerTransport transport, ExecutorService executorService,
ThreadContext threadContext) {
super(transport, randomBoolean(), threadContext);
this.executorService = executorService;
}
@ -214,7 +218,8 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
QueryStringDecoder decoder = new QueryStringDecoder(request.getUri());
final int timeout = request.getUri().startsWith("/slow") && decoder.getParameters().containsKey("sleep") ? Integer.valueOf(decoder.getParameters().get("sleep").get(0)) : 0;
final int timeout = request.getUri().startsWith("/slow") && decoder.getParameters().containsKey("sleep")
? Integer.valueOf(decoder.getParameters().get("sleep").get(0)) : 0;
if (timeout > 0) {
try {
Thread.sleep(timeout);

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.http.netty;
import org.elasticsearch.ESNettyIntegTestCase;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -41,7 +42,7 @@ import static org.hamcrest.Matchers.hasSize;
*
*/
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class NettyPipeliningDisabledIT extends ESIntegTestCase {
public class NettyPipeliningDisabledIT extends ESNettyIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()

View File

@ -18,16 +18,16 @@
*/
package org.elasticsearch.http.netty;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.ExternalTestCluster;
import org.elasticsearch.transport.NettyPlugin;
import org.jboss.netty.handler.codec.http.HttpResponse;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Locale;
@ -36,26 +36,19 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class NettyPipeliningEnabledIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(NetworkModule.HTTP_ENABLED.getKey(), true)
.put("http.pipelining", true)
.build();
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return pluginList(NettyPlugin.class);
}
public void testThatNettyHttpServerSupportsPipelining() throws Exception {
String[] requests = new String[]{"/", "/_nodes/stats", "/", "/_cluster/state", "/"};
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(boundAddresses);
InetSocketAddress inetSocketAddress = randomFrom(cluster().httpAddresses());
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
Collection<HttpResponse> responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests);
Collection<HttpResponse> responses = nettyHttpClient.get(inetSocketAddress, requests);
assertThat(responses, hasSize(5));
Collection<String> opaqueIds = returnOpaqueIds(responses);

View File

@ -19,11 +19,8 @@
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport.netty;
import org.elasticsearch.ESNettyIntegTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@ -46,8 +47,10 @@ import org.jboss.netty.channel.ChannelPipelineFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
@ -56,21 +59,22 @@ import static org.hamcrest.Matchers.is;
*
*/
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class NettyTransportIT extends ESIntegTestCase {
public class NettyTransportIT extends ESNettyIntegTestCase {
// static so we can use it in anonymous classes
private static String channelProfileName = null;
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(Node.NODE_MODE_SETTING.getKey(), "network")
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty")
.put(NetworkModule.TRANSPORT_TYPE_KEY, "exception-throwing").build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(ExceptionThrowingNettyTransport.TestPlugin.class);
List<Class<? extends Plugin>> list = new ArrayList<>();
list.add(ExceptionThrowingNettyTransport.TestPlugin.class);
list.addAll(super.nodePlugins());
return Collections.unmodifiableCollection(list);
}
public void testThatConnectionFailsAsIntended() throws Exception {

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport.netty;
import org.elasticsearch.ESNettyIntegTestCase;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
@ -30,11 +31,10 @@ import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.junit.annotations.Network;
import org.elasticsearch.transport.NettyPlugin;
import java.net.InetAddress;
import java.util.Locale;
@ -48,7 +48,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1, numClientNodes = 0)
public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase {
public class NettyTransportMultiPortIntegrationIT extends ESNettyIntegTestCase {
private static int randomPort = -1;
private static String randomPortRange;
@ -62,8 +62,6 @@ public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase {
Settings.Builder builder = Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("network.host", "127.0.0.1")
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty")
.put(Node.NODE_MODE_SETTING.getKey(), "network")
.put("transport.profiles.client1.port", randomPortRange)
.put("transport.profiles.client1.publish_host", "127.0.0.7")
.put("transport.profiles.client1.publish_port", "4321")
@ -77,7 +75,7 @@ public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase {
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty")
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.build();
try (TransportClient transportClient = TransportClient.builder().settings(settings).build()) {
try (TransportClient transportClient = TransportClient.builder().addPlugin(NettyPlugin.class).settings(settings).build()) {
transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), randomPort));
ClusterHealthResponse response = transportClient.admin().cluster().prepareHealth().get();
assertThat(response.getStatus(), is(ClusterHealthStatus.GREEN));

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport.netty;
import org.elasticsearch.ESNettyIntegTestCase;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.common.network.NetworkModule;
@ -41,14 +42,7 @@ import static org.hamcrest.Matchers.instanceOf;
* different ports on ipv4 and ipv6.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class NettyTransportPublishAddressIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty")
.put(Node.NODE_MODE_SETTING.getKey(), "network").build();
}
public class NettyTransportPublishAddressIT extends ESNettyIntegTestCase {
public void testDifferentPorts() throws Exception {
if (!NetworkUtils.SUPPORTS_V6) {

View File

@ -23,11 +23,16 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportSettings;
import java.net.InetAddress;
@ -39,10 +44,24 @@ import static org.hamcrest.Matchers.containsString;
public class SimpleNettyTransportTests extends AbstractSimpleTransportTestCase {
public static MockTransportService nettyFromThreadPool(
Settings settings,
ThreadPool threadPool, final Version version) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
Transport transport = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE,
namedWriteableRegistry, new NoneCircuitBreakerService()) {
@Override
protected Version getCurrentVersion() {
return version;
}
};
return new MockTransportService(Settings.EMPTY, transport, threadPool);
}
@Override
protected MockTransportService build(Settings settings, Version version) {
settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), "0").build();
MockTransportService transportService = MockTransportService.nettyFromThreadPool(settings, threadPool, version);
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version);
transportService.start();
return transportService;
}

View File

@ -0,0 +1,13 @@
# Integration tests for Netty transport
#
"Netty loaded":
- do:
cluster.state: {}
# Get master node id
- set: { master_node: master }
- do:
nodes.info: {}
- match: { nodes.$master.modules.0.name: transport-netty }

View File

@ -24,6 +24,7 @@ List projects = [
'modules:lang-groovy',
'modules:lang-mustache',
'modules:lang-painless',
'modules:transport-netty',
'modules:reindex',
'modules:percolator',
'plugins:analysis-icu',

View File

@ -86,13 +86,11 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.transport.AssertingLocalTransport;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.netty.NettyTransport;
import org.junit.Assert;
import java.io.Closeable;
@ -414,9 +412,8 @@ public final class InternalTestCluster extends TestCluster {
}
}
// randomize netty settings
// randomize tcp settings
if (random.nextBoolean()) {
builder.put(NettyTransport.WORKER_COUNT.getKey(), random.nextInt(3) + 1);
builder.put(TcpTransport.CONNECTIONS_PER_NODE_RECOVERY.getKey(), random.nextInt(2) + 1);
builder.put(TcpTransport.CONNECTIONS_PER_NODE_BULK.getKey(), random.nextInt(3) + 1);
builder.put(TcpTransport.CONNECTIONS_PER_NODE_REG.getKey(), random.nextInt(6) + 1);

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@ -51,7 +50,6 @@ import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.local.LocalTransport;
import org.elasticsearch.transport.netty.NettyTransport;
import java.io.IOException;
import java.util.Arrays;
@ -101,21 +99,6 @@ public class MockTransportService extends TransportService {
return new MockTransportService(settings, transport, threadPool);
}
public static MockTransportService nettyFromThreadPool(
Settings settings,
ThreadPool threadPool, final Version version) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
Transport transport = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE,
namedWriteableRegistry, new NoneCircuitBreakerService()) {
@Override
protected Version getCurrentVersion() {
return version;
}
};
return new MockTransportService(Settings.EMPTY, transport, threadPool);
}
private final Transport original;
@Inject