mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 02:14:54 +00:00
Merge pull request #19392 from elastic/modularize_netty
This moves all netty related code into modules/transport-netty the module is build as a zip file as well as a JAR to serve as a dependency for transport client. For the time being this is required otherwise we have no network based impl. for transport client users. This might be subject to change given that we move forward http client.
This commit is contained in:
commit
814c7224f9
@ -848,7 +848,6 @@
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]ReplicaShardAllocatorTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]ReusePeerRecoverySharedTest.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]get[/\\]GetActionIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]netty[/\\]NettyHttpServerPipeliningTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexModuleTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexServiceTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexWithShadowReplicasIT.java" checks="LineLength" />
|
||||
|
@ -74,8 +74,6 @@ dependencies {
|
||||
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}"
|
||||
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
|
||||
|
||||
// network stack
|
||||
compile 'io.netty:netty:3.10.6.Final'
|
||||
// percentiles aggregation
|
||||
compile 'com.tdunning:t-digest:3.0'
|
||||
// precentil ranks aggregation
|
||||
@ -152,26 +150,11 @@ processResources {
|
||||
}
|
||||
|
||||
thirdPartyAudit.excludes = [
|
||||
// uses internal java api: sun.security.x509 (X509CertInfo, X509CertImpl, X500Name)
|
||||
'org.jboss.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator',
|
||||
|
||||
// classes are missing!
|
||||
|
||||
// from com.fasterxml.jackson.dataformat.yaml.YAMLMapper (jackson-dataformat-yaml)
|
||||
'com.fasterxml.jackson.databind.ObjectMapper',
|
||||
|
||||
// from org.jboss.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder (netty)
|
||||
'com.google.protobuf.CodedInputStream',
|
||||
|
||||
// from org.jboss.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender (netty)
|
||||
'com.google.protobuf.CodedOutputStream',
|
||||
|
||||
// from org.jboss.netty.handler.codec.protobuf.ProtobufDecoder (netty)
|
||||
'com.google.protobuf.ExtensionRegistry',
|
||||
'com.google.protobuf.MessageLite$Builder',
|
||||
'com.google.protobuf.MessageLite',
|
||||
'com.google.protobuf.Parser',
|
||||
|
||||
// from org.apache.log4j.receivers.net.JMSReceiver (log4j-extras)
|
||||
'javax.jms.Message',
|
||||
'javax.jms.MessageListener',
|
||||
@ -196,72 +179,8 @@ thirdPartyAudit.excludes = [
|
||||
'javax.mail.internet.MimeMessage',
|
||||
'javax.mail.internet.MimeMultipart',
|
||||
'javax.mail.internet.MimeUtility',
|
||||
|
||||
// from org.jboss.netty.channel.socket.http.HttpTunnelingServlet (netty)
|
||||
'javax.servlet.ServletConfig',
|
||||
'javax.servlet.ServletException',
|
||||
'javax.servlet.ServletOutputStream',
|
||||
'javax.servlet.http.HttpServlet',
|
||||
'javax.servlet.http.HttpServletRequest',
|
||||
'javax.servlet.http.HttpServletResponse',
|
||||
|
||||
// from org.jboss.netty.logging.CommonsLoggerFactory (netty)
|
||||
'org.apache.commons.logging.Log',
|
||||
'org.apache.commons.logging.LogFactory',
|
||||
|
||||
// from org.jboss.netty.handler.ssl.OpenSslEngine (netty)
|
||||
'org.apache.tomcat.jni.Buffer',
|
||||
'org.apache.tomcat.jni.Library',
|
||||
'org.apache.tomcat.jni.Pool',
|
||||
'org.apache.tomcat.jni.SSL',
|
||||
'org.apache.tomcat.jni.SSLContext',
|
||||
|
||||
// from org.jboss.netty.handler.ssl.util.BouncyCastleSelfSignedCertGenerator (netty)
|
||||
'org.bouncycastle.asn1.x500.X500Name',
|
||||
'org.bouncycastle.cert.X509v3CertificateBuilder',
|
||||
'org.bouncycastle.cert.jcajce.JcaX509CertificateConverter',
|
||||
'org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder',
|
||||
'org.bouncycastle.jce.provider.BouncyCastleProvider',
|
||||
'org.bouncycastle.operator.jcajce.JcaContentSignerBuilder',
|
||||
|
||||
// from org.jboss.netty.handler.ssl.JettyNpnSslEngine (netty)
|
||||
'org.eclipse.jetty.npn.NextProtoNego$ClientProvider',
|
||||
'org.eclipse.jetty.npn.NextProtoNego$ServerProvider',
|
||||
'org.eclipse.jetty.npn.NextProtoNego',
|
||||
|
||||
// from org.jboss.netty.logging.JBossLoggerFactory (netty)
|
||||
'org.jboss.logging.Logger',
|
||||
|
||||
// from org.jboss.netty.handler.codec.marshalling.ChannelBufferByteInput (netty)
|
||||
'org.jboss.marshalling.ByteInput',
|
||||
|
||||
// from org.jboss.netty.handler.codec.marshalling.ChannelBufferByteOutput (netty)
|
||||
'org.jboss.marshalling.ByteOutput',
|
||||
|
||||
// from org.jboss.netty.handler.codec.marshalling.CompatibleMarshallingEncoder (netty)
|
||||
'org.jboss.marshalling.Marshaller',
|
||||
|
||||
// from org.jboss.netty.handler.codec.marshalling.ContextBoundUnmarshallerProvider (netty)
|
||||
'org.jboss.marshalling.MarshallerFactory',
|
||||
'org.jboss.marshalling.MarshallingConfiguration',
|
||||
'org.jboss.marshalling.Unmarshaller',
|
||||
|
||||
// from org.locationtech.spatial4j.io.GeoJSONReader (spatial4j)
|
||||
'org.noggit.JSONParser',
|
||||
|
||||
// from org.jboss.netty.container.osgi.NettyBundleActivator (netty)
|
||||
'org.osgi.framework.BundleActivator',
|
||||
'org.osgi.framework.BundleContext',
|
||||
|
||||
// from org.jboss.netty.logging.OsgiLoggerFactory$1 (netty)
|
||||
'org.osgi.framework.ServiceReference',
|
||||
'org.osgi.service.log.LogService',
|
||||
'org.osgi.util.tracker.ServiceTracker',
|
||||
'org.osgi.util.tracker.ServiceTrackerCustomizer',
|
||||
|
||||
// from org.netty.util.internal.logging.InternalLoggerFactory (netty) - it's optional
|
||||
'org.slf4j.Logger',
|
||||
'org.slf4j.LoggerFactory',
|
||||
]
|
||||
|
||||
// dependency license are currently checked in distribution
|
||||
|
@ -19,11 +19,9 @@
|
||||
|
||||
package org.elasticsearch.common.compress;
|
||||
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -20,8 +20,6 @@
|
||||
package org.elasticsearch.common.network;
|
||||
|
||||
import org.elasticsearch.action.support.replication.ReplicationTask;
|
||||
import org.elasticsearch.client.transport.TransportClientNodesService;
|
||||
import org.elasticsearch.client.transport.support.TransportProxyClient;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
|
||||
@ -41,13 +39,11 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.ExtensionPoint;
|
||||
import org.elasticsearch.http.HttpServer;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.http.netty.NettyHttpServerTransport;
|
||||
import org.elasticsearch.tasks.RawTaskStatus;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
|
||||
/**
|
||||
* A module to handle registering and binding all network related classes.
|
||||
@ -91,14 +87,9 @@ public class NetworkModule extends AbstractModule {
|
||||
this.namedWriteableRegistry = namedWriteableRegistry;
|
||||
registerTransportService(NETTY_TRANSPORT, TransportService.class);
|
||||
registerTransport(LOCAL_TRANSPORT, LocalTransport.class);
|
||||
registerTransport(NETTY_TRANSPORT, NettyTransport.class);
|
||||
registerTaskStatus(ReplicationTask.Status.NAME, ReplicationTask.Status::new);
|
||||
registerTaskStatus(RawTaskStatus.NAME, RawTaskStatus::new);
|
||||
registerBuiltinAllocationCommands();
|
||||
|
||||
if (transportClient == false) {
|
||||
registerHttpTransport(NETTY_TRANSPORT, NettyHttpServerTransport.class);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isTransportClient() {
|
||||
@ -156,8 +147,9 @@ public class NetworkModule extends AbstractModule {
|
||||
bind(NetworkService.class).toInstance(networkService);
|
||||
bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
|
||||
|
||||
boolean nettyRegistered = transportTypes.getExtension(NETTY_TRANSPORT) != null;
|
||||
transportServiceTypes.bindType(binder(), settings, TRANSPORT_SERVICE_TYPE_KEY, NETTY_TRANSPORT);
|
||||
String defaultTransport = DiscoveryNode.isLocalNode(settings) ? LOCAL_TRANSPORT : NETTY_TRANSPORT;
|
||||
String defaultTransport = DiscoveryNode.isLocalNode(settings) || nettyRegistered == false ? LOCAL_TRANSPORT : NETTY_TRANSPORT;
|
||||
transportTypes.bindType(binder(), settings, TRANSPORT_TYPE_KEY, defaultTransport);
|
||||
|
||||
if (transportClient == false) {
|
||||
@ -185,4 +177,8 @@ public class NetworkModule extends AbstractModule {
|
||||
AllocateStalePrimaryAllocationCommand.COMMAND_NAME_FIELD);
|
||||
|
||||
}
|
||||
|
||||
public boolean canRegisterHttpExtensions() {
|
||||
return transportClient == false;
|
||||
}
|
||||
}
|
||||
|
@ -61,7 +61,6 @@ import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.gateway.PrimaryShardAllocator;
|
||||
import org.elasticsearch.http.HttpTransportSettings;
|
||||
import org.elasticsearch.http.netty.NettyHttpServerTransport;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.IndexingMemoryController;
|
||||
@ -92,7 +91,6 @@ import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
import org.elasticsearch.tribe.TribeService;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
|
||||
@ -241,18 +239,6 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||
HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE,
|
||||
HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH,
|
||||
HttpTransportSettings.SETTING_HTTP_RESET_COOKIES,
|
||||
NettyHttpServerTransport.SETTING_HTTP_NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
|
||||
NettyHttpServerTransport.SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
|
||||
NettyHttpServerTransport.SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE,
|
||||
NettyHttpServerTransport.SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_MIN,
|
||||
NettyHttpServerTransport.SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_MAX,
|
||||
NettyHttpServerTransport.SETTING_HTTP_WORKER_COUNT,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_NO_DELAY,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_KEEP_ALIVE,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_BLOCKING_SERVER,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_REUSE_ADDRESS,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_SEND_BUFFER_SIZE,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE,
|
||||
HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
@ -278,7 +264,6 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||
TransportSettings.BIND_HOST,
|
||||
TransportSettings.PUBLISH_PORT,
|
||||
TransportSettings.PORT,
|
||||
NettyTransport.WORKER_COUNT,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_RECOVERY,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_BULK,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_REG,
|
||||
@ -287,13 +272,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||
TcpTransport.PING_SCHEDULE,
|
||||
TcpTransport.TCP_BLOCKING_CLIENT,
|
||||
TcpTransport.TCP_CONNECT_TIMEOUT,
|
||||
NettyTransport.NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
|
||||
NettyTransport.NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
|
||||
NettyTransport.NETTY_RECEIVE_PREDICTOR_SIZE,
|
||||
NettyTransport.NETTY_RECEIVE_PREDICTOR_MIN,
|
||||
NettyTransport.NETTY_RECEIVE_PREDICTOR_MAX,
|
||||
NetworkService.NETWORK_SERVER,
|
||||
NettyTransport.NETTY_BOSS_COUNT,
|
||||
TcpTransport.TCP_NO_DELAY,
|
||||
TcpTransport.TCP_KEEP_ALIVE,
|
||||
TcpTransport.TCP_REUSE_ADDRESS,
|
||||
|
@ -83,14 +83,6 @@ grant {
|
||||
permission java.util.PropertyPermission "junit4.childvm.count", "write";
|
||||
permission java.util.PropertyPermission "junit4.childvm.id", "write";
|
||||
|
||||
// set by NettyTransport/NettyHttpServerTransport based on another parameter
|
||||
// TODO: look into this and decide if users should simply set the actual sysprop?!
|
||||
permission java.util.PropertyPermission "org.jboss.netty.epollBugWorkaround", "write";
|
||||
|
||||
// Netty SelectorUtil wants to change this, because of https://bugs.openjdk.java.net/browse/JDK-6427854
|
||||
// the bug says it only happened rarely, and that its fixed, but apparently it still happens rarely!
|
||||
permission java.util.PropertyPermission "sun.nio.ch.bugLevel", "write";
|
||||
|
||||
// needed by Settings
|
||||
permission java.lang.RuntimePermission "getenv.*";
|
||||
|
||||
|
@ -108,7 +108,9 @@ public class NetworkModuleTests extends ModuleTestCase {
|
||||
}
|
||||
|
||||
public void testRegisterTransportService() {
|
||||
Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_SERVICE_TYPE_KEY, "custom").build();
|
||||
Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_SERVICE_TYPE_KEY, "custom")
|
||||
.put(NetworkModule.HTTP_ENABLED.getKey(), false)
|
||||
.build();
|
||||
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
|
||||
module.registerTransportService("custom", FakeTransportService.class);
|
||||
assertBinding(module, TransportService.class, FakeTransportService.class);
|
||||
@ -122,7 +124,9 @@ public class NetworkModuleTests extends ModuleTestCase {
|
||||
}
|
||||
|
||||
public void testRegisterTransport() {
|
||||
Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "custom").build();
|
||||
Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "custom")
|
||||
.put(NetworkModule.HTTP_ENABLED.getKey(), false)
|
||||
.build();
|
||||
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
|
||||
module.registerTransport("custom", FakeTransport.class);
|
||||
assertBinding(module, Transport.class, FakeTransport.class);
|
||||
|
@ -39,10 +39,10 @@ import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.MockTcpTransport;
|
||||
import org.elasticsearch.transport.TransportConnectionListener;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
@ -198,13 +198,8 @@ public class UnicastZenPingIT extends ESTestCase {
|
||||
|
||||
private NetworkHandle startServices(Settings settings, ThreadPool threadPool, NetworkService networkService, String nodeId,
|
||||
Version version) {
|
||||
NettyTransport transport = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE,
|
||||
new NamedWriteableRegistry(), new NoneCircuitBreakerService()) {
|
||||
@Override
|
||||
protected Version getCurrentVersion() {
|
||||
return version;
|
||||
}
|
||||
};
|
||||
MockTcpTransport transport = new MockTcpTransport("mock", settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
||||
new NoneCircuitBreakerService(), new NamedWriteableRegistry(), networkService, version);
|
||||
final TransportService transportService = new TransportService(settings, transport, threadPool);
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
|
@ -20,7 +20,6 @@
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
@ -32,7 +31,6 @@ import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
@ -45,27 +43,27 @@ import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
||||
public class NettyTransportServiceHandshakeTests extends ESTestCase {
|
||||
public class TransportServiceHandshakeTests extends ESTestCase {
|
||||
|
||||
private static ThreadPool threadPool;
|
||||
private static final long timeout = Long.MAX_VALUE;
|
||||
|
||||
@BeforeClass
|
||||
public static void startThreadPool() {
|
||||
threadPool = new TestThreadPool(NettyTransportServiceHandshakeTests.class.getSimpleName());
|
||||
threadPool = new TestThreadPool(TransportServiceHandshakeTests.class.getSimpleName());
|
||||
}
|
||||
|
||||
private List<TransportService> transportServices = new ArrayList<>();
|
||||
|
||||
private NetworkHandle startServices(String nodeNameAndId, Settings settings, Version version) {
|
||||
NettyTransport transport =
|
||||
new NettyTransport(
|
||||
MockTcpTransport transport =
|
||||
new MockTcpTransport("mock",
|
||||
settings,
|
||||
threadPool,
|
||||
new NetworkService(settings),
|
||||
BigArrays.NON_RECYCLING_INSTANCE,
|
||||
new NoneCircuitBreakerService(),
|
||||
new NamedWriteableRegistry(),
|
||||
new NoneCircuitBreakerService());
|
||||
new NetworkService(settings));
|
||||
TransportService transportService = new MockTransportService(settings, transport, threadPool);
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
@ -33,6 +33,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.discovery.MasterNotDiscoveredException;
|
||||
@ -131,6 +132,7 @@ public class TribeIT extends ESIntegTestCase {
|
||||
.put("tribe.t1.cluster.name", internalCluster().getClusterName())
|
||||
.put("tribe.t2.cluster.name", cluster2.getClusterName())
|
||||
.put("tribe.blocks.write", false)
|
||||
.put(NetworkModule.HTTP_ENABLED.getKey(), false)
|
||||
.put(settings)
|
||||
.put(tribe1Defaults.build())
|
||||
.put(tribe2Defaults.build())
|
||||
|
@ -49,7 +49,7 @@ ext.dependencyFiles = project(':core').configurations.runtime.copyRecursive()
|
||||
|
||||
task buildModules(type: Sync) {
|
||||
into 'build/modules'
|
||||
}
|
||||
}
|
||||
|
||||
ext.restTestExpansions = [
|
||||
'expected.modules.count': 0,
|
||||
@ -82,6 +82,21 @@ project.rootProject.subprojects.findAll { it.path.startsWith(':modules:') }.each
|
||||
restTestExpansions['expected.modules.count'] += 1
|
||||
}
|
||||
|
||||
// Integ tests work over the rest http layer, so we need a transport included with the integ test zip.
|
||||
// All transport modules are included so that they may be randomized for testing
|
||||
task buildTransportModules(type: Sync) {
|
||||
into 'build/transport-modules'
|
||||
}
|
||||
|
||||
project.rootProject.subprojects.findAll { it.path.startsWith(':modules:transport-') }.each { Project transport ->
|
||||
buildTransportModules {
|
||||
dependsOn({ project(transport.path).bundlePlugin })
|
||||
into(transport.name) {
|
||||
from { zipTree(project(transport.path).bundlePlugin.outputs.files.singleFile) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// make sure we have a clean task since we aren't a java project, but we have tasks that
|
||||
// put stuff in the build dir
|
||||
task clean(type: Delete) {
|
||||
@ -145,6 +160,11 @@ subprojects {
|
||||
from project(':distribution').buildModules
|
||||
}
|
||||
|
||||
transportModulesFiles = copySpec {
|
||||
into "modules"
|
||||
from project(':distribution').buildTransportModules
|
||||
}
|
||||
|
||||
configFiles = copySpec {
|
||||
from '../src/main/resources/config'
|
||||
MavenFilteringHack.filter(it, expansions)
|
||||
@ -209,6 +229,8 @@ configure(subprojects.findAll { ['zip', 'tar', 'integ-test-zip'].contains(it.nam
|
||||
}
|
||||
if (project.name != 'integ-test-zip') {
|
||||
with modulesFiles
|
||||
} else {
|
||||
with transportModulesFiles
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -42,6 +42,8 @@ dependencies {
|
||||
compile "org.apache.httpcomponents:httpcore:${versions.httpcore}"
|
||||
compile "commons-codec:commons-codec:${versions.commonscodec}"
|
||||
compile "commons-logging:commons-logging:${versions.commonslogging}"
|
||||
// for http - testing reindex from remote
|
||||
testCompile project(path: ':modules:transport-netty', configuration: 'runtime')
|
||||
}
|
||||
|
||||
dependencyLicenses {
|
||||
|
@ -28,6 +28,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.bulk.Retry;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
@ -35,11 +36,13 @@ import org.elasticsearch.index.reindex.remote.RemoteInfo;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.NettyPlugin;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
||||
@ -59,7 +62,19 @@ public class RetryTests extends ESSingleNodeTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||
return pluginList(ReindexPlugin.class);
|
||||
return pluginList(ReindexPlugin.class, NettyPlugin.class, BogusPlugin.class); // we need netty here to http communication
|
||||
}
|
||||
|
||||
public static final class BogusPlugin extends Plugin {
|
||||
// se NettyPlugin.... this runs without the permission from the netty module so it will fail since reindex can't set the property
|
||||
// to make it still work we disable that check but need to register the setting first
|
||||
private static final Setting<Boolean> ASSERT_NETTY_BUGLEVEL = Setting.boolSetting("netty.assert.buglevel", true,
|
||||
Setting.Property.NodeScope);
|
||||
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Collections.singletonList(ASSERT_NETTY_BUGLEVEL);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -69,6 +84,7 @@ public class RetryTests extends ESSingleNodeTestCase {
|
||||
protected Settings nodeSettings() {
|
||||
Settings.Builder settings = Settings.builder().put(super.nodeSettings());
|
||||
// Use pools of size 1 so we can block them
|
||||
settings.put("netty.assert.buglevel", false);
|
||||
settings.put("thread_pool.bulk.size", 1);
|
||||
settings.put("thread_pool.search.size", 1);
|
||||
// Use queues of size 1 because size 0 is broken and because search requests need the queue to function
|
||||
@ -188,7 +204,7 @@ public class RetryTests extends ESSingleNodeTestCase {
|
||||
barrier.await();
|
||||
logger.info("Blocked the [{}] executor", name);
|
||||
barrier.await();
|
||||
logger.info("Ublocking the [{}] executor", name);
|
||||
logger.info("Unblocking the [{}] executor", name);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
118
modules/transport-netty/build.gradle
Normal file
118
modules/transport-netty/build.gradle
Normal file
@ -0,0 +1,118 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
TODOs:
|
||||
* fix permissions such that only netty can open sockets etc?
|
||||
* fix the hack in the build framework that copies transport-netty into the integ test cluster
|
||||
* maybe figure out a way to run all tests from core with netty/network?
|
||||
*/
|
||||
esplugin {
|
||||
description 'Netty 3 based transport implementation'
|
||||
classname 'org.elasticsearch.transport.NettyPlugin'
|
||||
}
|
||||
|
||||
compileTestJava.options.compilerArgs << "-Xlint:-cast,-deprecation,-rawtypes,-try,-unchecked"
|
||||
|
||||
dependencies {
|
||||
// network stack
|
||||
compile 'io.netty:netty:3.10.6.Final'
|
||||
}
|
||||
|
||||
thirdPartyAudit.excludes = [
|
||||
// uses internal java api: sun.security.x509 (X509CertInfo, X509CertImpl, X500Name)
|
||||
'org.jboss.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator',
|
||||
// classes are missing
|
||||
|
||||
// from org.jboss.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder (netty)
|
||||
'com.google.protobuf.CodedInputStream',
|
||||
|
||||
// from org.jboss.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender (netty)
|
||||
'com.google.protobuf.CodedOutputStream',
|
||||
|
||||
// from org.jboss.netty.handler.codec.protobuf.ProtobufDecoder (netty)
|
||||
'com.google.protobuf.ExtensionRegistry',
|
||||
'com.google.protobuf.MessageLite$Builder',
|
||||
'com.google.protobuf.MessageLite',
|
||||
'com.google.protobuf.Parser',
|
||||
|
||||
// from org.jboss.netty.channel.socket.http.HttpTunnelingServlet (netty)
|
||||
'javax.servlet.ServletConfig',
|
||||
'javax.servlet.ServletException',
|
||||
'javax.servlet.ServletOutputStream',
|
||||
'javax.servlet.http.HttpServlet',
|
||||
'javax.servlet.http.HttpServletRequest',
|
||||
'javax.servlet.http.HttpServletResponse',
|
||||
|
||||
// from org.jboss.netty.logging.CommonsLoggerFactory (netty)
|
||||
'org.apache.commons.logging.Log',
|
||||
'org.apache.commons.logging.LogFactory',
|
||||
|
||||
// from org.jboss.netty.handler.ssl.OpenSslEngine (netty)
|
||||
'org.apache.tomcat.jni.Buffer',
|
||||
'org.apache.tomcat.jni.Library',
|
||||
'org.apache.tomcat.jni.Pool',
|
||||
'org.apache.tomcat.jni.SSL',
|
||||
'org.apache.tomcat.jni.SSLContext',
|
||||
|
||||
// from org.jboss.netty.handler.ssl.util.BouncyCastleSelfSignedCertGenerator (netty)
|
||||
'org.bouncycastle.asn1.x500.X500Name',
|
||||
'org.bouncycastle.cert.X509v3CertificateBuilder',
|
||||
'org.bouncycastle.cert.jcajce.JcaX509CertificateConverter',
|
||||
'org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder',
|
||||
'org.bouncycastle.jce.provider.BouncyCastleProvider',
|
||||
'org.bouncycastle.operator.jcajce.JcaContentSignerBuilder',
|
||||
|
||||
// from org.jboss.netty.handler.ssl.JettyNpnSslEngine (netty)
|
||||
'org.eclipse.jetty.npn.NextProtoNego$ClientProvider',
|
||||
'org.eclipse.jetty.npn.NextProtoNego$ServerProvider',
|
||||
'org.eclipse.jetty.npn.NextProtoNego',
|
||||
|
||||
// from org.jboss.netty.logging.JBossLoggerFactory (netty)
|
||||
'org.jboss.logging.Logger',
|
||||
|
||||
// from org.jboss.netty.handler.codec.marshalling.ChannelBufferByteInput (netty)
|
||||
'org.jboss.marshalling.ByteInput',
|
||||
|
||||
// from org.jboss.netty.handler.codec.marshalling.ChannelBufferByteOutput (netty)
|
||||
'org.jboss.marshalling.ByteOutput',
|
||||
|
||||
// from org.jboss.netty.handler.codec.marshalling.CompatibleMarshallingEncoder (netty)
|
||||
'org.jboss.marshalling.Marshaller',
|
||||
|
||||
// from org.jboss.netty.handler.codec.marshalling.ContextBoundUnmarshallerProvider (netty)
|
||||
'org.jboss.marshalling.MarshallerFactory',
|
||||
'org.jboss.marshalling.MarshallingConfiguration',
|
||||
'org.jboss.marshalling.Unmarshaller',
|
||||
|
||||
// from org.jboss.netty.container.osgi.NettyBundleActivator (netty)
|
||||
'org.osgi.framework.BundleActivator',
|
||||
'org.osgi.framework.BundleContext',
|
||||
|
||||
// from org.jboss.netty.logging.OsgiLoggerFactory$1 (netty)
|
||||
'org.osgi.framework.ServiceReference',
|
||||
'org.osgi.service.log.LogService',
|
||||
'org.osgi.util.tracker.ServiceTracker',
|
||||
'org.osgi.util.tracker.ServiceTrackerCustomizer',
|
||||
|
||||
// from org.netty.util.internal.logging.InternalLoggerFactory (netty) - it's optional
|
||||
'org.slf4j.Logger',
|
||||
'org.slf4j.LoggerFactory',
|
||||
]
|
||||
|
@ -110,9 +110,6 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
|
||||
import static org.elasticsearch.http.netty.cors.CorsHandler.ANY_ORIGIN;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class NettyHttpServerTransport extends AbstractLifecycleComponent implements HttpServerTransport {
|
||||
|
||||
static {
|
||||
@ -284,19 +281,17 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent impleme
|
||||
@Override
|
||||
protected void doStart() {
|
||||
this.serverOpenChannels = new OpenChannelsHandler(logger);
|
||||
|
||||
if (blockingServer) {
|
||||
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, "http_server_boss")),
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, "http_server_worker"))
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, "http_server_boss")),
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, "http_server_worker"))
|
||||
));
|
||||
} else {
|
||||
serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, "http_server_boss")),
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, "http_server_worker")),
|
||||
workerCount));
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, "http_server_boss")),
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, "http_server_worker")),
|
||||
workerCount));
|
||||
}
|
||||
|
||||
serverBootstrap.setPipelineFactory(configureServerChannelPipelineFactory());
|
||||
|
||||
serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
|
@ -0,0 +1,94 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.SpecialPermission;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.http.netty.NettyHttpServerTransport;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class NettyPlugin extends Plugin {
|
||||
|
||||
|
||||
public static final String NETTY_TRANSPORT_NAME = "netty";
|
||||
public static final String NETTY_HTTP_TRANSPORT_NAME = "netty";
|
||||
|
||||
public NettyPlugin(Settings settings) {
|
||||
SecurityManager sm = System.getSecurityManager();
|
||||
if (sm != null) {
|
||||
sm.checkPermission(new SpecialPermission());
|
||||
}
|
||||
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
|
||||
try {
|
||||
Class.forName("org.jboss.netty.channel.socket.nio.SelectorUtil");
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new AssertionError(e); // we don't do anything with this
|
||||
}
|
||||
return null;
|
||||
});
|
||||
/*
|
||||
* Asserts that sun.nio.ch.bugLevel has been set to a non-null value. This assertion will fail if the corresponding code
|
||||
* is not executed in a doPrivileged block. This can be disabled via `netty.assert.buglevel` setting which isn't registered
|
||||
* by default but test can do so if they depend on the jar instead of the module.
|
||||
*/
|
||||
//TODO Once we have no jar level dependency we can get rid of this.
|
||||
if (settings.getAsBoolean("netty.assert.buglevel", true)) {
|
||||
assert System.getProperty("sun.nio.ch.bugLevel") != null :
|
||||
"sun.nio.ch.bugLevel is null somebody pulls in SelectorUtil without doing stuff in a doPrivileged block?";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Arrays.asList(
|
||||
NettyHttpServerTransport.SETTING_HTTP_NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
|
||||
NettyHttpServerTransport.SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
|
||||
NettyHttpServerTransport.SETTING_HTTP_WORKER_COUNT,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_NO_DELAY,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_KEEP_ALIVE,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_BLOCKING_SERVER,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_REUSE_ADDRESS,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_SEND_BUFFER_SIZE,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE,
|
||||
NettyTransport.WORKER_COUNT,
|
||||
NettyTransport.NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
|
||||
NettyTransport.NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
|
||||
NettyTransport.NETTY_RECEIVE_PREDICTOR_SIZE,
|
||||
NettyTransport.NETTY_RECEIVE_PREDICTOR_MIN,
|
||||
NettyTransport.NETTY_RECEIVE_PREDICTOR_MAX,
|
||||
NettyTransport.NETTY_BOSS_COUNT
|
||||
);
|
||||
}
|
||||
|
||||
public void onModule(NetworkModule networkModule) {
|
||||
if (networkModule.canRegisterHttpExtensions()) {
|
||||
networkModule.registerHttpTransport(NETTY_HTTP_TRANSPORT_NAME, NettyHttpServerTransport.class);
|
||||
}
|
||||
networkModule.registerTransport(NETTY_TRANSPORT_NAME, NettyTransport.class);
|
||||
}
|
||||
}
|
@ -182,6 +182,7 @@ public class NettyTransport extends TcpTransport<Channel> {
|
||||
}
|
||||
|
||||
private ClientBootstrap createClientBootstrap() {
|
||||
// this doPrivileged is for SelectorUtil.java that tries to set "sun.nio.ch.bugLevel"
|
||||
if (blockingClient) {
|
||||
clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX))));
|
||||
@ -280,7 +281,7 @@ public class NettyTransport extends TcpTransport<Channel> {
|
||||
|
||||
final ThreadFactory bossFactory = daemonThreadFactory(this.settings, HTTP_SERVER_BOSS_THREAD_NAME_PREFIX, name);
|
||||
final ThreadFactory workerFactory = daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX, name);
|
||||
ServerBootstrap serverBootstrap;
|
||||
final ServerBootstrap serverBootstrap;
|
||||
if (blockingServer) {
|
||||
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(bossFactory),
|
@ -105,7 +105,6 @@ public class NettyUtils {
|
||||
}
|
||||
|
||||
public static void setup() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
@ -0,0 +1,24 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
grant {
|
||||
// Netty SelectorUtil wants to change this, because of https://bugs.openjdk.java.net/browse/JDK-6427854
|
||||
// the bug says it only happened rarely, and that its fixed, but apparently it still happens rarely!
|
||||
permission java.util.PropertyPermission "sun.nio.ch.bugLevel", "write";
|
||||
};
|
@ -0,0 +1,72 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch;
|
||||
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.transport.NettyPlugin;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
@ESIntegTestCase.SuppressLocalMode
|
||||
public abstract class ESNettyIntegTestCase extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected boolean ignoreExternalCluster() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean addMockTransportService() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal));
|
||||
// randomize netty settings
|
||||
if (randomBoolean()) {
|
||||
builder.put(NettyTransport.WORKER_COUNT.getKey(), random().nextInt(3) + 1);
|
||||
}
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty");
|
||||
builder.put(NetworkModule.HTTP_TYPE_KEY, "netty");
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings transportClientSettings() {
|
||||
Settings.Builder builder = Settings.builder().put(super.transportClientSettings());
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty");
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(NettyPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
|
||||
return pluginList(NettyPlugin.class);
|
||||
}
|
||||
|
||||
}
|
@ -18,6 +18,7 @@
|
||||
*/
|
||||
package org.elasticsearch.http.netty;
|
||||
|
||||
import org.elasticsearch.ESNettyIntegTestCase;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -45,7 +46,7 @@ import static org.hamcrest.Matchers.hasSize;
|
||||
* a single node "cluster". We also force test infrastructure to use the node client instead of the transport client for the same reason.
|
||||
*/
|
||||
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numClientNodes = 0, numDataNodes = 1, transportClientRatio = 0)
|
||||
public class NettyHttpRequestSizeLimitIT extends ESIntegTestCase {
|
||||
public class NettyHttpRequestSizeLimitIT extends ESNettyIntegTestCase {
|
||||
private static final ByteSizeValue LIMIT = new ByteSizeValue(2, ByteSizeUnit.KB);
|
||||
|
||||
@Override
|
@ -95,7 +95,8 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
|
||||
.build();
|
||||
httpServerTransport = new CustomNettyHttpServerTransport(settings);
|
||||
httpServerTransport.start();
|
||||
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());
|
||||
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress()
|
||||
.boundAddresses());
|
||||
|
||||
List<String> requests = Arrays.asList("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast");
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
@ -112,7 +113,8 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
|
||||
.build();
|
||||
httpServerTransport = new CustomNettyHttpServerTransport(settings);
|
||||
httpServerTransport.start();
|
||||
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());
|
||||
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress()
|
||||
.boundAddresses());
|
||||
|
||||
List<String> requests = Arrays.asList("/slow?sleep=1000", "/firstfast", "/secondfast", "/thirdfast", "/slow?sleep=500");
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
@ -138,7 +140,8 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
|
||||
|
||||
@Override
|
||||
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
|
||||
return new CustomHttpChannelPipelineFactory(this, executorService, NettyHttpServerPipeliningTests.this.threadPool.getThreadContext());
|
||||
return new CustomHttpChannelPipelineFactory(this, executorService, NettyHttpServerPipeliningTests.this.threadPool
|
||||
.getThreadContext());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -152,7 +155,8 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
|
||||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
public CustomHttpChannelPipelineFactory(NettyHttpServerTransport transport, ExecutorService executorService, ThreadContext threadContext) {
|
||||
public CustomHttpChannelPipelineFactory(NettyHttpServerTransport transport, ExecutorService executorService,
|
||||
ThreadContext threadContext) {
|
||||
super(transport, randomBoolean(), threadContext);
|
||||
this.executorService = executorService;
|
||||
}
|
||||
@ -214,7 +218,8 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
|
||||
|
||||
QueryStringDecoder decoder = new QueryStringDecoder(request.getUri());
|
||||
|
||||
final int timeout = request.getUri().startsWith("/slow") && decoder.getParameters().containsKey("sleep") ? Integer.valueOf(decoder.getParameters().get("sleep").get(0)) : 0;
|
||||
final int timeout = request.getUri().startsWith("/slow") && decoder.getParameters().containsKey("sleep")
|
||||
? Integer.valueOf(decoder.getParameters().get("sleep").get(0)) : 0;
|
||||
if (timeout > 0) {
|
||||
try {
|
||||
Thread.sleep(timeout);
|
@ -18,6 +18,7 @@
|
||||
*/
|
||||
package org.elasticsearch.http.netty;
|
||||
|
||||
import org.elasticsearch.ESNettyIntegTestCase;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
@ -41,7 +42,7 @@ import static org.hamcrest.Matchers.hasSize;
|
||||
*
|
||||
*/
|
||||
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
|
||||
public class NettyPipeliningDisabledIT extends ESIntegTestCase {
|
||||
public class NettyPipeliningDisabledIT extends ESNettyIntegTestCase {
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
@ -18,16 +18,16 @@
|
||||
*/
|
||||
package org.elasticsearch.http.netty;
|
||||
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.test.ExternalTestCluster;
|
||||
import org.elasticsearch.transport.NettyPlugin;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
import java.util.Locale;
|
||||
|
||||
@ -36,26 +36,19 @@ import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
|
||||
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
|
||||
public class NettyPipeliningEnabledIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put(NetworkModule.HTTP_ENABLED.getKey(), true)
|
||||
.put("http.pipelining", true)
|
||||
.build();
|
||||
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
|
||||
return pluginList(NettyPlugin.class);
|
||||
}
|
||||
|
||||
public void testThatNettyHttpServerSupportsPipelining() throws Exception {
|
||||
String[] requests = new String[]{"/", "/_nodes/stats", "/", "/_cluster/state", "/"};
|
||||
|
||||
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
|
||||
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
|
||||
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(boundAddresses);
|
||||
|
||||
InetSocketAddress inetSocketAddress = randomFrom(cluster().httpAddresses());
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
Collection<HttpResponse> responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests);
|
||||
Collection<HttpResponse> responses = nettyHttpClient.get(inetSocketAddress, requests);
|
||||
assertThat(responses, hasSize(5));
|
||||
|
||||
Collection<String> opaqueIds = returnOpaqueIds(responses);
|
@ -19,11 +19,8 @@
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
@ -18,6 +18,7 @@
|
||||
*/
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.ESNettyIntegTestCase;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
@ -46,8 +47,10 @@ import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
@ -56,21 +59,22 @@ import static org.hamcrest.Matchers.is;
|
||||
*
|
||||
*/
|
||||
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
|
||||
public class NettyTransportIT extends ESIntegTestCase {
|
||||
public class NettyTransportIT extends ESNettyIntegTestCase {
|
||||
// static so we can use it in anonymous classes
|
||||
private static String channelProfileName = null;
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
||||
.put(Node.NODE_MODE_SETTING.getKey(), "network")
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty")
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, "exception-throwing").build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(ExceptionThrowingNettyTransport.TestPlugin.class);
|
||||
List<Class<? extends Plugin>> list = new ArrayList<>();
|
||||
list.add(ExceptionThrowingNettyTransport.TestPlugin.class);
|
||||
list.addAll(super.nodePlugins());
|
||||
return Collections.unmodifiableCollection(list);
|
||||
}
|
||||
|
||||
public void testThatConnectionFailsAsIntended() throws Exception {
|
@ -18,6 +18,7 @@
|
||||
*/
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.ESNettyIntegTestCase;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
@ -30,11 +31,10 @@ import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.test.junit.annotations.Network;
|
||||
import org.elasticsearch.transport.NettyPlugin;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.Locale;
|
||||
@ -48,7 +48,7 @@ import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1, numClientNodes = 0)
|
||||
public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase {
|
||||
public class NettyTransportMultiPortIntegrationIT extends ESNettyIntegTestCase {
|
||||
|
||||
private static int randomPort = -1;
|
||||
private static String randomPortRange;
|
||||
@ -62,8 +62,6 @@ public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase {
|
||||
Settings.Builder builder = Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put("network.host", "127.0.0.1")
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty")
|
||||
.put(Node.NODE_MODE_SETTING.getKey(), "network")
|
||||
.put("transport.profiles.client1.port", randomPortRange)
|
||||
.put("transport.profiles.client1.publish_host", "127.0.0.7")
|
||||
.put("transport.profiles.client1.publish_port", "4321")
|
||||
@ -77,7 +75,7 @@ public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase {
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty")
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
|
||||
.build();
|
||||
try (TransportClient transportClient = TransportClient.builder().settings(settings).build()) {
|
||||
try (TransportClient transportClient = TransportClient.builder().addPlugin(NettyPlugin.class).settings(settings).build()) {
|
||||
transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), randomPort));
|
||||
ClusterHealthResponse response = transportClient.admin().cluster().prepareHealth().get();
|
||||
assertThat(response.getStatus(), is(ClusterHealthStatus.GREEN));
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.ESNettyIntegTestCase;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
@ -41,14 +42,7 @@ import static org.hamcrest.Matchers.instanceOf;
|
||||
* different ports on ipv4 and ipv6.
|
||||
*/
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||
public class NettyTransportPublishAddressIT extends ESIntegTestCase {
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty")
|
||||
.put(Node.NODE_MODE_SETTING.getKey(), "network").build();
|
||||
}
|
||||
public class NettyTransportPublishAddressIT extends ESNettyIntegTestCase {
|
||||
|
||||
public void testDifferentPorts() throws Exception {
|
||||
if (!NetworkUtils.SUPPORTS_V6) {
|
@ -23,11 +23,16 @@ import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
|
||||
import java.net.InetAddress;
|
||||
@ -39,10 +44,24 @@ import static org.hamcrest.Matchers.containsString;
|
||||
|
||||
public class SimpleNettyTransportTests extends AbstractSimpleTransportTestCase {
|
||||
|
||||
public static MockTransportService nettyFromThreadPool(
|
||||
Settings settings,
|
||||
ThreadPool threadPool, final Version version) {
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
Transport transport = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE,
|
||||
namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
||||
@Override
|
||||
protected Version getCurrentVersion() {
|
||||
return version;
|
||||
}
|
||||
};
|
||||
return new MockTransportService(Settings.EMPTY, transport, threadPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MockTransportService build(Settings settings, Version version) {
|
||||
settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), "0").build();
|
||||
MockTransportService transportService = MockTransportService.nettyFromThreadPool(settings, threadPool, version);
|
||||
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version);
|
||||
transportService.start();
|
||||
return transportService;
|
||||
}
|
@ -0,0 +1,13 @@
|
||||
# Integration tests for Netty transport
|
||||
#
|
||||
"Netty loaded":
|
||||
- do:
|
||||
cluster.state: {}
|
||||
|
||||
# Get master node id
|
||||
- set: { master_node: master }
|
||||
|
||||
- do:
|
||||
nodes.info: {}
|
||||
|
||||
- match: { nodes.$master.modules.0.name: transport-netty }
|
@ -22,6 +22,8 @@ package org.elasticsearch.discovery.ec2;
|
||||
import com.amazonaws.util.IOUtils;
|
||||
import com.sun.net.httpserver.Headers;
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import org.apache.http.NameValuePair;
|
||||
import org.apache.http.client.utils.URLEncodedUtils;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
@ -30,7 +32,6 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
@ -49,7 +50,6 @@ import java.nio.file.Path;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
@ -101,9 +101,13 @@ public class Ec2DiscoveryClusterFormationTests extends ESIntegTestCase {
|
||||
httpServer.createContext("/", (s) -> {
|
||||
Headers headers = s.getResponseHeaders();
|
||||
headers.add("Content-Type", "text/xml; charset=UTF-8");
|
||||
QueryStringDecoder decoder = new QueryStringDecoder("?" + IOUtils.toString(s.getRequestBody()));
|
||||
Map<String, List<String>> queryParams = decoder.getParameters();
|
||||
String action = queryParams.get("Action").get(0);
|
||||
String action = null;
|
||||
for (NameValuePair parse : URLEncodedUtils.parse(IOUtils.toString(s.getRequestBody()), StandardCharsets.UTF_8)) {
|
||||
if ("Action".equals(parse.getName())) {
|
||||
action = parse.getValue();
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertThat(action, equalTo("DescribeInstances"));
|
||||
|
||||
XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newFactory();
|
||||
|
@ -20,3 +20,7 @@
|
||||
apply plugin: 'elasticsearch.rest-test'
|
||||
|
||||
// TODO: this test works, but it isn't really a rest test...should we have another plugin for "non rest test that just needs N clusters?"
|
||||
|
||||
dependencies {
|
||||
testCompile project(path: ':modules:transport-netty', configuration: 'runtime') // randomly swapped in as a transport
|
||||
}
|
@ -25,12 +25,18 @@ import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.internal.InternalSettingsPreparer;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.transport.MockTcpTransportPlugin;
|
||||
import org.elasticsearch.transport.NettyPlugin;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
@ -41,6 +47,8 @@ import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@ -72,14 +80,34 @@ public abstract class ESSmokeClientTestCase extends LuceneTestCase {
|
||||
private static String clusterAddresses;
|
||||
protected String index;
|
||||
|
||||
private static Client startClient(Path tempDir, TransportAddress... transportAddresses) {
|
||||
Settings clientSettings = Settings.builder()
|
||||
.put("node.name", "qa_smoke_client_" + counter.getAndIncrement())
|
||||
.put("client.transport.ignore_cluster_name", true)
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
|
||||
.put(Node.NODE_MODE_SETTING.getKey(), "network").build(); // we require network here!
|
||||
public static final class BogusPlugin extends Plugin {
|
||||
// se NettyPlugin.... this runs without the permission from the netty module so it will fail since reindex can't set the property
|
||||
// to make it still work we disable that check but need to register the setting first
|
||||
private static final Setting<Boolean> ASSERT_NETTY_BUGLEVEL = Setting.boolSetting("netty.assert.buglevel", true,
|
||||
Setting.Property.NodeScope);
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Collections.singletonList(ASSERT_NETTY_BUGLEVEL);
|
||||
}
|
||||
}
|
||||
|
||||
TransportClient.Builder transportClientBuilder = TransportClient.builder().settings(clientSettings);
|
||||
private static Client startClient(Path tempDir, TransportAddress... transportAddresses) {
|
||||
TransportClient.Builder transportClientBuilder = TransportClient.builder();
|
||||
Settings.Builder builder = Settings.builder()
|
||||
.put("node.name", "qa_smoke_client_" + counter.getAndIncrement())
|
||||
.put("client.transport.ignore_cluster_name", true)
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
|
||||
.put(Node.NODE_MODE_SETTING.getKey(), "network");// we require network here!
|
||||
if (random().nextBoolean()) {
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, NettyPlugin.NETTY_TRANSPORT_NAME);
|
||||
transportClientBuilder.addPlugin(NettyPlugin.class);
|
||||
transportClientBuilder.addPlugin(BogusPlugin.class);
|
||||
builder.put("netty.assert.buglevel", false); // see BogusPlugin
|
||||
} else {
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME);
|
||||
transportClientBuilder.addPlugin(MockTcpTransportPlugin.class);
|
||||
}
|
||||
transportClientBuilder.settings(builder.build());
|
||||
TransportClient client = transportClientBuilder.build().addTransportAddresses(transportAddresses);
|
||||
|
||||
logger.info("--> Elasticsearch Java TransportClient started");
|
||||
|
@ -18,3 +18,7 @@
|
||||
*/
|
||||
|
||||
apply plugin: 'elasticsearch.rest-test'
|
||||
|
||||
dependencies {
|
||||
testCompile project(path: ':modules:transport-netty', configuration: 'runtime') // for http
|
||||
}
|
@ -47,7 +47,6 @@ import org.elasticsearch.indices.TermsLookup;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
@ -74,7 +73,7 @@ import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@ClusterScope(scope = SUITE)
|
||||
public class ContextAndHeaderTransportIT extends ESIntegTestCase {
|
||||
public class ContextAndHeaderTransportIT extends HttpSmokeTestCase {
|
||||
private static final List<RequestAndHeaders> requests = new CopyOnWriteArrayList<>();
|
||||
private String randomHeaderKey = randomAsciiOfLength(10);
|
||||
private String randomHeaderValue = randomAsciiOfLength(20);
|
||||
@ -96,7 +95,9 @@ public class ContextAndHeaderTransportIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(ActionLoggingPlugin.class);
|
||||
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
|
||||
plugins.add(ActionLoggingPlugin.class);
|
||||
return plugins;
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -26,7 +26,7 @@ import org.elasticsearch.test.ESIntegTestCase;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class CorsNotSetIT extends ESIntegTestCase {
|
||||
public class CorsNotSetIT extends HttpSmokeTestCase {
|
||||
|
||||
|
||||
public void testCorsSettingDefaultBehaviourDoesNotReturnAnything() throws Exception {
|
||||
|
@ -21,14 +21,11 @@ package org.elasticsearch.http;
|
||||
import org.apache.http.message.BasicHeader;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.jboss.netty.handler.codec.http.HttpHeaders;
|
||||
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_CREDENTIALS;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_METHODS;
|
||||
@ -41,7 +38,7 @@ import static org.hamcrest.Matchers.nullValue;
|
||||
* Test CORS where the allow origin value is a regular expression.
|
||||
*/
|
||||
@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1)
|
||||
public class CorsRegexIT extends ESIntegTestCase {
|
||||
public class CorsRegexIT extends HttpSmokeTestCase {
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
@ -105,7 +102,7 @@ public class CorsRegexIT extends ESIntegTestCase {
|
||||
String corsValue = "http://localhost:9200";
|
||||
try (Response response = getRestClient().performRequest("OPTIONS", "/",
|
||||
new BasicHeader("User-Agent", "Mozilla Bar"), new BasicHeader("Origin", corsValue),
|
||||
new BasicHeader(HttpHeaders.Names.ACCESS_CONTROL_REQUEST_METHOD, "GET"));) {
|
||||
new BasicHeader("Access-Control-Request-Method", "GET"));) {
|
||||
assertResponseWithOriginheader(response, corsValue);
|
||||
assertNotNull(response.getHeader("Access-Control-Allow-Methods"));
|
||||
}
|
||||
@ -115,7 +112,7 @@ public class CorsRegexIT extends ESIntegTestCase {
|
||||
try {
|
||||
getRestClient().performRequest("OPTIONS", "/", new BasicHeader("User-Agent", "Mozilla Bar"),
|
||||
new BasicHeader("Origin", "http://evil-host:9200"),
|
||||
new BasicHeader(HttpHeaders.Names.ACCESS_CONTROL_REQUEST_METHOD, "GET"));
|
||||
new BasicHeader("Access-Control-Request-Method", "GET"));
|
||||
fail("request should have failed");
|
||||
} catch(ResponseException e) {
|
||||
Response response = e.getResponse();
|
||||
|
@ -16,7 +16,7 @@
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.rest;
|
||||
package org.elasticsearch.http;
|
||||
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.HttpEntity;
|
||||
@ -29,10 +29,6 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.rest.plugins.TestDeprecatedQueryBuilder;
|
||||
import org.elasticsearch.rest.plugins.TestDeprecationHeaderRestAction;
|
||||
import org.elasticsearch.rest.plugins.TestDeprecationPlugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import org.hamcrest.Matcher;
|
||||
|
||||
@ -45,9 +41,9 @@ import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.rest.RestStatus.OK;
|
||||
import static org.elasticsearch.rest.plugins.TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1;
|
||||
import static org.elasticsearch.rest.plugins.TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2;
|
||||
import static org.elasticsearch.rest.plugins.TestDeprecationHeaderRestAction.TEST_NOT_DEPRECATED_SETTING;
|
||||
import static org.elasticsearch.http.TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1;
|
||||
import static org.elasticsearch.http.TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2;
|
||||
import static org.elasticsearch.http.TestDeprecationHeaderRestAction.TEST_NOT_DEPRECATED_SETTING;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
@ -56,7 +52,7 @@ import static org.hamcrest.Matchers.hasSize;
|
||||
/**
|
||||
* Tests {@code DeprecationLogger} uses the {@code ThreadContext} to add response headers.
|
||||
*/
|
||||
public class DeprecationHttpIT extends ESIntegTestCase {
|
||||
public class DeprecationHttpIT extends HttpSmokeTestCase {
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
@ -73,7 +69,9 @@ public class DeprecationHttpIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(TestDeprecationPlugin.class);
|
||||
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
|
||||
plugins.add(TestDeprecationPlugin.class);
|
||||
return plugins;
|
||||
}
|
||||
|
||||
/**
|
@ -36,7 +36,7 @@ import static org.hamcrest.Matchers.is;
|
||||
* Tests that when disabling detailed errors, a request with the error_trace parameter returns a HTTP 400
|
||||
*/
|
||||
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
|
||||
public class DetailedErrorsDisabledIT extends ESIntegTestCase {
|
||||
public class DetailedErrorsDisabledIT extends HttpSmokeTestCase {
|
||||
// Build our cluster settings
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
|
@ -35,7 +35,7 @@ import static org.hamcrest.Matchers.not;
|
||||
/**
|
||||
* Tests that by default the error_trace parameter can be used to show stacktraces
|
||||
*/
|
||||
public class DetailedErrorsEnabledIT extends ESIntegTestCase {
|
||||
public class DetailedErrorsEnabledIT extends HttpSmokeTestCase {
|
||||
|
||||
public void testThatErrorTraceWorksByDefault() throws Exception {
|
||||
try {
|
||||
|
@ -33,7 +33,6 @@ import org.elasticsearch.test.ESIntegTestCase;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 1, numClientNodes = 1)
|
||||
public class HttpCompressionIT extends ESIntegTestCase {
|
||||
private static final String GZIP_ENCODING = "gzip";
|
||||
|
||||
@ -44,6 +43,10 @@ public class HttpCompressionIT extends ESIntegTestCase {
|
||||
" }\n" +
|
||||
"}", RestClient.JSON_CONTENT_TYPE);
|
||||
|
||||
@Override
|
||||
protected boolean ignoreExternalCluster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public void testCompressesResponseIfRequested() throws Exception {
|
||||
ensureGreen();
|
||||
|
@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.http;
|
||||
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.transport.MockTcpTransport;
|
||||
import org.elasticsearch.transport.MockTcpTransportPlugin;
|
||||
import org.elasticsearch.transport.NettyPlugin;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public abstract class HttpSmokeTestCase extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put("netty.assert.buglevel", false)
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, randomFrom(NettyPlugin.NETTY_TRANSPORT_NAME,
|
||||
MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME))
|
||||
.put(NetworkModule.HTTP_ENABLED.getKey(), true).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(MockTcpTransportPlugin.class, NettyPlugin.class, BogusPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
|
||||
return pluginList(MockTcpTransportPlugin.class, NettyPlugin.class, BogusPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings transportClientSettings() {
|
||||
return Settings.builder()
|
||||
.put(super.transportClientSettings())
|
||||
.put("netty.assert.buglevel", false)
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, randomFrom(NettyPlugin.NETTY_TRANSPORT_NAME,
|
||||
MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME)).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean ignoreExternalCluster() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
public static final class BogusPlugin extends Plugin {
|
||||
// see NettyPlugin.... this runs without the permission from the netty module so it will fail since reindex can't set the property
|
||||
// to make it still work we disable that check but need to register the setting first
|
||||
private static final Setting<Boolean> ASSERT_NETTY_BUGLEVEL = Setting.boolSetting("netty.assert.buglevel", true,
|
||||
Setting.Property.NodeScope);
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Collections.singletonList(ASSERT_NETTY_BUGLEVEL);
|
||||
}
|
||||
}
|
||||
}
|
@ -16,17 +16,17 @@
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.plugins;
|
||||
package org.elasticsearch.http;
|
||||
|
||||
import org.apache.http.message.BasicHeader;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugins.responseheader.TestResponseHeaderPlugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
@ -35,7 +35,7 @@ import static org.hamcrest.Matchers.equalTo;
|
||||
* Test a rest action that sets special response headers
|
||||
*/
|
||||
@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1)
|
||||
public class ResponseHeaderPluginIT extends ESIntegTestCase {
|
||||
public class ResponseHeaderPluginIT extends HttpSmokeTestCase {
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
@ -44,9 +44,16 @@ public class ResponseHeaderPluginIT extends ESIntegTestCase {
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean ignoreExternalCluster() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(TestResponseHeaderPlugin.class);
|
||||
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
|
||||
plugins.add(TestResponseHeaderPlugin.class);
|
||||
return plugins;
|
||||
}
|
||||
|
||||
public void testThatSettingHeadersWorks() throws Exception {
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.rest.plugins;
|
||||
package org.elasticsearch.http;
|
||||
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.elasticsearch.common.ParseField;
|
@ -16,7 +16,7 @@
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.rest.plugins;
|
||||
package org.elasticsearch.http;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.inject.Inject;
|
@ -16,7 +16,7 @@
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.rest.plugins;
|
||||
package org.elasticsearch.http;
|
||||
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugins.responseheader;
|
||||
package org.elasticsearch.http;
|
||||
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
@ -16,7 +16,7 @@
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.plugins.responseheader;
|
||||
package org.elasticsearch.http;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.inject.Inject;
|
@ -24,6 +24,7 @@ List projects = [
|
||||
'modules:lang-groovy',
|
||||
'modules:lang-mustache',
|
||||
'modules:lang-painless',
|
||||
'modules:transport-netty',
|
||||
'modules:reindex',
|
||||
'modules:percolator',
|
||||
'plugins:analysis-icu',
|
||||
|
@ -1772,7 +1772,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
||||
@Override
|
||||
public Collection<Class<? extends Plugin>> transportClientPlugins() {
|
||||
Collection<Class<? extends Plugin>> plugins = ESIntegTestCase.this.transportClientPlugins();
|
||||
if (isNetwork) {
|
||||
if (isNetwork && plugins.contains(MockTcpTransportPlugin.class) == false) {
|
||||
plugins = new ArrayList<>(plugins);
|
||||
plugins.add(MockTcpTransportPlugin.class);
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
@ -36,6 +37,8 @@ import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.internal.InternalSettingsPreparer;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.MockTcpTransport;
|
||||
import org.elasticsearch.transport.MockTcpTransportPlugin;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
@ -70,14 +73,22 @@ public final class ExternalTestCluster extends TestCluster {
|
||||
|
||||
public ExternalTestCluster(Path tempDir, Settings additionalSettings, Collection<Class<? extends Plugin>> pluginClasses, TransportAddress... transportAddresses) {
|
||||
super(0);
|
||||
Settings clientSettings = Settings.builder()
|
||||
.put(additionalSettings)
|
||||
.put("node.name", InternalTestCluster.TRANSPORT_CLIENT_PREFIX + EXTERNAL_CLUSTER_PREFIX + counter.getAndIncrement())
|
||||
.put("client.transport.ignore_cluster_name", true)
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
|
||||
.put(Node.NODE_MODE_SETTING.getKey(), "network").build(); // we require network here!
|
||||
|
||||
TransportClient.Builder transportClientBuilder = TransportClient.builder().settings(clientSettings);
|
||||
Settings.Builder clientSettingsBuilder = Settings.builder()
|
||||
.put(additionalSettings)
|
||||
.put("node.name", InternalTestCluster.TRANSPORT_CLIENT_PREFIX + EXTERNAL_CLUSTER_PREFIX + counter.getAndIncrement())
|
||||
.put("client.transport.ignore_cluster_name", true)
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
|
||||
.put(Node.NODE_MODE_SETTING.getKey(), "network");// we require network here!
|
||||
TransportClient.Builder transportClientBuilder = TransportClient.builder();
|
||||
boolean addMockTcpTransport = additionalSettings.get(NetworkModule.TRANSPORT_TYPE_KEY) == null;
|
||||
if (addMockTcpTransport) {
|
||||
clientSettingsBuilder.put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME);
|
||||
if (pluginClasses.contains(MockTcpTransportPlugin.class) == false) {
|
||||
transportClientBuilder.addPlugin(MockTcpTransportPlugin.class);
|
||||
}
|
||||
}
|
||||
Settings clientSettings = clientSettingsBuilder.build();
|
||||
transportClientBuilder.settings(clientSettings);
|
||||
for (Class<? extends Plugin> pluginClass : pluginClasses) {
|
||||
transportClientBuilder.addPlugin(pluginClass);
|
||||
}
|
||||
|
@ -86,13 +86,11 @@ import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
||||
import org.elasticsearch.test.transport.AssertingLocalTransport;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
import org.junit.Assert;
|
||||
|
||||
import java.io.Closeable;
|
||||
@ -414,9 +412,8 @@ public final class InternalTestCluster extends TestCluster {
|
||||
}
|
||||
}
|
||||
|
||||
// randomize netty settings
|
||||
// randomize tcp settings
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(NettyTransport.WORKER_COUNT.getKey(), random.nextInt(3) + 1);
|
||||
builder.put(TcpTransport.CONNECTIONS_PER_NODE_RECOVERY.getKey(), random.nextInt(2) + 1);
|
||||
builder.put(TcpTransport.CONNECTIONS_PER_NODE_BULK.getKey(), random.nextInt(3) + 1);
|
||||
builder.put(TcpTransport.CONNECTIONS_PER_NODE_REG.getKey(), random.nextInt(6) + 1);
|
||||
|
@ -29,12 +29,12 @@ public abstract class NodeConfigurationSource {
|
||||
public static final NodeConfigurationSource EMPTY = new NodeConfigurationSource() {
|
||||
@Override
|
||||
public Settings nodeSettings(int nodeOrdinal) {
|
||||
return null;
|
||||
return Settings.EMPTY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings transportClientSettings() {
|
||||
return null;
|
||||
return Settings.EMPTY;
|
||||
}
|
||||
};
|
||||
|
||||
@ -48,7 +48,9 @@ public abstract class NodeConfigurationSource {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
public abstract Settings transportClientSettings();
|
||||
public Settings transportClientSettings() {
|
||||
return Settings.EMPTY;
|
||||
}
|
||||
|
||||
/** Returns plugins that should be loaded in the transport client */
|
||||
public Collection<Class<? extends Plugin>> transportClientPlugins() {
|
||||
|
@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
@ -51,7 +50,6 @@ import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
@ -101,21 +99,6 @@ public class MockTransportService extends TransportService {
|
||||
return new MockTransportService(settings, transport, threadPool);
|
||||
}
|
||||
|
||||
public static MockTransportService nettyFromThreadPool(
|
||||
Settings settings,
|
||||
ThreadPool threadPool, final Version version) {
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
Transport transport = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE,
|
||||
namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
||||
@Override
|
||||
protected Version getCurrentVersion() {
|
||||
return version;
|
||||
}
|
||||
};
|
||||
return new MockTransportService(Settings.EMPTY, transport, threadPool);
|
||||
}
|
||||
|
||||
|
||||
private final Transport original;
|
||||
|
||||
@Inject
|
||||
|
@ -24,6 +24,7 @@ import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
@ -125,15 +126,16 @@ public class InternalTestClusterTests extends ESTestCase {
|
||||
boolean masterNodes = randomBoolean();
|
||||
int minNumDataNodes = randomIntBetween(0, 3);
|
||||
int maxNumDataNodes = randomIntBetween(minNumDataNodes, 4);
|
||||
final String clusterName1 = "shared1";//clusterName("shared1", clusterSeed);
|
||||
final String clusterName2 = "shared2";//clusterName("shared", Integer.toString(CHILD_JVM_ID), clusterSeed);
|
||||
/*while (clusterName.equals(clusterName1)) {
|
||||
clusterName1 = clusterName("shared", Integer.toString(CHILD_JVM_ID), clusterSeed); // spin until the time changes
|
||||
}*/
|
||||
NodeConfigurationSource nodeConfigurationSource = NodeConfigurationSource.EMPTY;
|
||||
final String clusterName1 = "shared1";
|
||||
final String clusterName2 = "shared2";
|
||||
NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() {
|
||||
@Override
|
||||
public Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(NetworkModule.HTTP_ENABLED.getKey(), false).build();
|
||||
}
|
||||
};
|
||||
int numClientNodes = randomIntBetween(0, 2);
|
||||
boolean enableHttpPipelining = randomBoolean();
|
||||
int jvmOrdinal = randomIntBetween(0, 10);
|
||||
String nodePrefix = "foobar";
|
||||
|
||||
Path baseDir = createTempDir();
|
||||
@ -177,8 +179,12 @@ public class InternalTestClusterTests extends ESTestCase {
|
||||
int minNumDataNodes = 2;
|
||||
int maxNumDataNodes = 2;
|
||||
final String clusterName1 = "shared1";
|
||||
NodeConfigurationSource nodeConfigurationSource = NodeConfigurationSource.EMPTY;
|
||||
int numClientNodes = randomIntBetween(0, 2);
|
||||
NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() {
|
||||
@Override
|
||||
public Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(NetworkModule.HTTP_ENABLED.getKey(), false).build();
|
||||
}
|
||||
}; int numClientNodes = randomIntBetween(0, 2);
|
||||
boolean enableHttpPipelining = randomBoolean();
|
||||
String nodePrefix = "test";
|
||||
Path baseDir = createTempDir();
|
||||
@ -248,7 +254,9 @@ public class InternalTestClusterTests extends ESTestCase {
|
||||
new NodeConfigurationSource() {
|
||||
@Override
|
||||
public Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0).build();
|
||||
return Settings.builder()
|
||||
.put(NetworkModule.HTTP_ENABLED.getKey(), false)
|
||||
.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
x
Reference in New Issue
Block a user