Merge branch 'master' into rest_headers
This commit is contained in:
commit
4b9932d4a8
|
@ -59,7 +59,7 @@ public abstract class HandledTransportAction<Request extends ActionRequest<Reque
|
|||
|
||||
@Override
|
||||
public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
|
||||
// We already got the task created on the netty layer - no need to create it again on the transport layer
|
||||
// We already got the task created on the network layer - no need to create it again on the transport layer
|
||||
execute(task, request, new ActionListener<Response>() {
|
||||
@Override
|
||||
public void onResponse(Response response) {
|
||||
|
|
|
@ -285,7 +285,7 @@ final class Security {
|
|||
}
|
||||
|
||||
// loop through all profiles and add permissions for each one, if its valid.
|
||||
// (otherwise NettyTransport is lenient and ignores it)
|
||||
// (otherwise Netty transports are lenient and ignores it)
|
||||
for (Map.Entry<String, Settings> entry : profiles.entrySet()) {
|
||||
Settings profileSettings = entry.getValue();
|
||||
String name = entry.getKey();
|
||||
|
|
|
@ -46,20 +46,62 @@ plugin folder and point `HADOOP_HOME` variable to it; this should minimize the a
|
|||
[[repository-hdfs-config]]
|
||||
==== Configuration Properties
|
||||
|
||||
Once installed, define the configuration for the `hdfs` repository through `elasticsearch.yml` or the
|
||||
Once installed, define the configuration for the `hdfs` repository through the
|
||||
{ref}/modules-snapshots.html[REST API]:
|
||||
|
||||
[source,js]
|
||||
----
|
||||
PUT _snapshot/my_hdfs_repository
|
||||
{
|
||||
"type": "hdfs",
|
||||
"settings": {
|
||||
"uri": "hdfs://namenode:8020/",
|
||||
"path": "elasticsearch/respositories/my_hdfs_repository",
|
||||
"conf.dfs.client.read.shortcircuit": "true"
|
||||
}
|
||||
}
|
||||
----
|
||||
// CONSOLE
|
||||
// TEST[skip:we don't have hdfs set up while testing this]
|
||||
|
||||
The following settings are supported:
|
||||
|
||||
[horizontal]
|
||||
`uri`::
|
||||
|
||||
The uri address for hdfs. ex: "hdfs://<host>:<port>/". (Required)
|
||||
|
||||
`path`::
|
||||
|
||||
The file path within the filesystem where data is stored/loaded. ex: "path/to/file". (Required)
|
||||
|
||||
`load_defaults`::
|
||||
|
||||
Whether to load the default Hadoop configuration or not. (Enabled by default)
|
||||
|
||||
`conf.<key>`::
|
||||
|
||||
Inlined configuration parameter to be added to Hadoop configuration. (Optional)
|
||||
Only client oriented properties from the hadoop http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/core-default.xml[core] and http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml[hdfs] configuration files will be recognized by the plugin.
|
||||
|
||||
`compress`::
|
||||
|
||||
Whether to compress the metadata or not. (Disabled by default)
|
||||
|
||||
`chunk_size`::
|
||||
|
||||
Override the chunk size. (Disabled by default)
|
||||
|
||||
|
||||
Alternatively, you can define the `hdfs` repository and its settings in your `elasticsearch.yml`:
|
||||
[source,yaml]
|
||||
----
|
||||
repositories
|
||||
repositories:
|
||||
hdfs:
|
||||
uri: "hdfs://<host>:<port>/" \# required - HDFS address only
|
||||
path: "some/path" \# required - path within the file-system where data is stored/loaded
|
||||
load_defaults: "true" \# optional - whether to load the default Hadoop configuration (default) or not
|
||||
conf_location: "extra-cfg.xml" \# optional - Hadoop configuration XML to be loaded (use commas for multi values)
|
||||
conf.<key> : "<value>" \# optional - 'inlined' key=value added to the Hadoop configuration
|
||||
concurrent_streams: 5 \# optional - the number of concurrent streams (defaults to 5)
|
||||
compress: "false" \# optional - whether to compress the metadata or not (default)
|
||||
chunk_size: "10mb" \# optional - chunk size (disabled by default)
|
||||
|
||||
----
|
||||
|
|
|
@ -44,7 +44,7 @@ dependencies {
|
|||
compile "commons-codec:commons-codec:${versions.commonscodec}"
|
||||
compile "commons-logging:commons-logging:${versions.commonslogging}"
|
||||
// for http - testing reindex from remote
|
||||
testCompile project(path: ':modules:transport-netty', configuration: 'runtime')
|
||||
testCompile project(path: ':modules:transport-netty3', configuration: 'runtime')
|
||||
}
|
||||
|
||||
dependencyLicenses {
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.elasticsearch.index.reindex.remote.RemoteInfo;
|
|||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.NettyPlugin;
|
||||
import org.elasticsearch.transport.Netty3Plugin;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
|
@ -62,11 +62,11 @@ public class RetryTests extends ESSingleNodeTestCase {
|
|||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||
return pluginList(ReindexPlugin.class, NettyPlugin.class, BogusPlugin.class); // we need netty here to http communication
|
||||
return pluginList(ReindexPlugin.class, Netty3Plugin.class, BogusPlugin.class); // we need netty here to http communication
|
||||
}
|
||||
|
||||
public static final class BogusPlugin extends Plugin {
|
||||
// se NettyPlugin.... this runs without the permission from the netty module so it will fail since reindex can't set the property
|
||||
// se Netty3Plugin.... this runs without the permission from the netty3 module so it will fail since reindex can't set the property
|
||||
// to make it still work we disable that check but need to register the setting first
|
||||
private static final Setting<Boolean> ASSERT_NETTY_BUGLEVEL = Setting.boolSetting("netty.assert.buglevel", true,
|
||||
Setting.Property.NodeScope);
|
||||
|
|
|
@ -19,13 +19,13 @@
|
|||
|
||||
/*
|
||||
TODOs:
|
||||
* fix permissions such that only netty can open sockets etc?
|
||||
* fix the hack in the build framework that copies transport-netty into the integ test cluster
|
||||
* maybe figure out a way to run all tests from core with netty/network?
|
||||
* fix permissions such that only netty3 can open sockets etc?
|
||||
* fix the hack in the build framework that copies transport-netty3 into the integ test cluster
|
||||
* maybe figure out a way to run all tests from core with netty3/network?
|
||||
*/
|
||||
esplugin {
|
||||
description 'Netty 3 based transport implementation'
|
||||
classname 'org.elasticsearch.transport.NettyPlugin'
|
||||
classname 'org.elasticsearch.transport.Netty3Plugin'
|
||||
hasClientJar = true
|
||||
}
|
||||
|
||||
|
@ -112,8 +112,7 @@ thirdPartyAudit.excludes = [
|
|||
'org.osgi.util.tracker.ServiceTracker',
|
||||
'org.osgi.util.tracker.ServiceTrackerCustomizer',
|
||||
|
||||
// from org.netty.util.internal.logging.InternalLoggerFactory (netty) - it's optional
|
||||
// from org.jboss.netty.util.internal.logging.InternalLoggerFactory (netty) - it's optional
|
||||
'org.slf4j.Logger',
|
||||
'org.slf4j.LoggerFactory',
|
||||
]
|
||||
|
|
@ -114,4 +114,3 @@ framework implementation, which can be obtained at:
|
|||
* license/LICENSE.felix.txt (Apache License 2.0)
|
||||
* HOMEPAGE:
|
||||
* http://felix.apache.org/
|
||||
|
|
@ -17,9 +17,9 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http.netty;
|
||||
package org.elasticsearch.http.netty3;
|
||||
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.elasticsearch.transport.netty3.Netty3Utils;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.buffer.CompositeChannelBuffer;
|
||||
|
@ -32,18 +32,18 @@ import java.util.List;
|
|||
/**
|
||||
* Wraps a netty {@link HttpResponseEncoder} and makes sure that if the resulting
|
||||
* channel buffer is composite, it will use the correct gathering flag. See more
|
||||
* at {@link NettyUtils#DEFAULT_GATHERING}.
|
||||
* at {@link Netty3Utils#DEFAULT_GATHERING}.
|
||||
*/
|
||||
public class ESHttpResponseEncoder extends HttpResponseEncoder {
|
||||
public class ESNetty3HttpResponseEncoder extends HttpResponseEncoder {
|
||||
|
||||
@Override
|
||||
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
|
||||
Object retVal = super.encode(ctx, channel, msg);
|
||||
if (retVal instanceof CompositeChannelBuffer) {
|
||||
CompositeChannelBuffer ccb = (CompositeChannelBuffer) retVal;
|
||||
if (ccb.useGathering() != NettyUtils.DEFAULT_GATHERING) {
|
||||
if (ccb.useGathering() != Netty3Utils.DEFAULT_GATHERING) {
|
||||
List<ChannelBuffer> decompose = ccb.decompose(ccb.readerIndex(), ccb.readableBytes());
|
||||
return ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING,
|
||||
return ChannelBuffers.wrappedBuffer(Netty3Utils.DEFAULT_GATHERING,
|
||||
decompose.toArray(new ChannelBuffer[decompose.size()]));
|
||||
}
|
||||
}
|
|
@ -17,18 +17,18 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http.netty;
|
||||
package org.elasticsearch.http.netty3;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.elasticsearch.transport.netty3.Netty3Utils;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.http.netty.cors.CorsHandler;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
|
||||
import org.elasticsearch.http.netty3.cors.Netty3CorsHandler;
|
||||
import org.elasticsearch.http.netty3.pipelining.OrderedDownstreamChannelEvent;
|
||||
import org.elasticsearch.http.netty3.pipelining.OrderedUpstreamMessageEvent;
|
||||
import org.elasticsearch.rest.AbstractRestChannel;
|
||||
import org.elasticsearch.rest.RestResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
@ -55,24 +55,24 @@ import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
|
|||
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
|
||||
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.KEEP_ALIVE;
|
||||
|
||||
public final class NettyHttpChannel extends AbstractRestChannel {
|
||||
public final class Netty3HttpChannel extends AbstractRestChannel {
|
||||
|
||||
private final NettyHttpServerTransport transport;
|
||||
private final Netty3HttpServerTransport transport;
|
||||
private final Channel channel;
|
||||
private final org.jboss.netty.handler.codec.http.HttpRequest nettyRequest;
|
||||
private final OrderedUpstreamMessageEvent orderedUpstreamMessageEvent;
|
||||
private final ThreadContext threadContext;
|
||||
|
||||
/**
|
||||
* @param transport The corresponding <code>NettyHttpServerTransport</code> where this channel belongs to.
|
||||
* @param transport The corresponding <code>Netty3HttpServerTransport</code> where this channel belongs to.
|
||||
* @param request The request that is handled by this channel.
|
||||
* @param orderedUpstreamMessageEvent If HTTP pipelining is enabled provide the corresponding Netty upstream event. May be null if
|
||||
* HTTP pipelining is disabled.
|
||||
* @param detailedErrorsEnabled true iff error messages should include stack traces.
|
||||
*/
|
||||
public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request,
|
||||
@Nullable OrderedUpstreamMessageEvent orderedUpstreamMessageEvent,
|
||||
boolean detailedErrorsEnabled, ThreadContext threadContext) {
|
||||
public Netty3HttpChannel(Netty3HttpServerTransport transport, Netty3HttpRequest request,
|
||||
@Nullable OrderedUpstreamMessageEvent orderedUpstreamMessageEvent,
|
||||
boolean detailedErrorsEnabled, ThreadContext threadContext) {
|
||||
super(request, detailedErrorsEnabled);
|
||||
this.transport = transport;
|
||||
this.channel = request.getChannel();
|
||||
|
@ -93,7 +93,7 @@ public final class NettyHttpChannel extends AbstractRestChannel {
|
|||
HttpResponse resp = newResponse();
|
||||
resp.setStatus(getStatus(response.status()));
|
||||
|
||||
CorsHandler.setCorsResponseHeaders(nettyRequest, resp, transport.getCorsConfig());
|
||||
Netty3CorsHandler.setCorsResponseHeaders(nettyRequest, resp, transport.getCorsConfig());
|
||||
|
||||
String opaque = nettyRequest.headers().get("X-Opaque-Id");
|
||||
if (opaque != null) {
|
||||
|
@ -108,7 +108,7 @@ public final class NettyHttpChannel extends AbstractRestChannel {
|
|||
ChannelBuffer buffer;
|
||||
boolean addedReleaseListener = false;
|
||||
try {
|
||||
buffer = NettyUtils.toChannelBuffer(content);
|
||||
buffer = Netty3Utils.toChannelBuffer(content);
|
||||
resp.setContent(buffer);
|
||||
|
||||
// If our response doesn't specify a content-type header, set one
|
|
@ -17,11 +17,11 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http.netty;
|
||||
package org.elasticsearch.http.netty3;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.elasticsearch.transport.netty3.Netty3Utils;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.support.RestUtils;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
|
@ -34,7 +34,7 @@ import java.util.Map;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
public class NettyHttpRequest extends RestRequest {
|
||||
public class Netty3HttpRequest extends RestRequest {
|
||||
|
||||
private final org.jboss.netty.handler.codec.http.HttpRequest request;
|
||||
private final Channel channel;
|
||||
|
@ -42,12 +42,12 @@ public class NettyHttpRequest extends RestRequest {
|
|||
private final String rawPath;
|
||||
private final BytesReference content;
|
||||
|
||||
public NettyHttpRequest(org.jboss.netty.handler.codec.http.HttpRequest request, Channel channel) {
|
||||
public Netty3HttpRequest(org.jboss.netty.handler.codec.http.HttpRequest request, Channel channel) {
|
||||
this.request = request;
|
||||
this.channel = channel;
|
||||
this.params = new HashMap<>();
|
||||
if (request.getContent().readable()) {
|
||||
this.content = NettyUtils.toBytesReference(request.getContent());
|
||||
this.content = Netty3Utils.toBytesReference(request.getContent());
|
||||
} else {
|
||||
this.content = BytesArray.EMPTY;
|
||||
}
|
|
@ -17,10 +17,10 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http.netty;
|
||||
package org.elasticsearch.http.netty3;
|
||||
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
|
||||
import org.elasticsearch.http.netty3.pipelining.OrderedUpstreamMessageEvent;
|
||||
import org.jboss.netty.channel.ChannelHandler;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
|
@ -28,18 +28,15 @@ import org.jboss.netty.channel.MessageEvent;
|
|||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.handler.codec.http.HttpRequest;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@ChannelHandler.Sharable
|
||||
public class HttpRequestHandler extends SimpleChannelUpstreamHandler {
|
||||
public class Netty3HttpRequestHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
private final NettyHttpServerTransport serverTransport;
|
||||
private final Netty3HttpServerTransport serverTransport;
|
||||
private final boolean httpPipeliningEnabled;
|
||||
private final boolean detailedErrorsEnabled;
|
||||
private final ThreadContext threadContext;
|
||||
|
||||
public HttpRequestHandler(NettyHttpServerTransport serverTransport, boolean detailedErrorsEnabled, ThreadContext threadContext) {
|
||||
public Netty3HttpRequestHandler(Netty3HttpServerTransport serverTransport, boolean detailedErrorsEnabled, ThreadContext threadContext) {
|
||||
this.serverTransport = serverTransport;
|
||||
this.httpPipeliningEnabled = serverTransport.pipelining;
|
||||
this.detailedErrorsEnabled = detailedErrorsEnabled;
|
||||
|
@ -60,8 +57,8 @@ public class HttpRequestHandler extends SimpleChannelUpstreamHandler {
|
|||
threadContext.copyHeaders(request.headers());
|
||||
// the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally
|
||||
// when reading, or using a cumalation buffer
|
||||
NettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel());
|
||||
NettyHttpChannel channel = new NettyHttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled, threadContext);
|
||||
Netty3HttpRequest httpRequest = new Netty3HttpRequest(request, e.getChannel());
|
||||
Netty3HttpChannel channel = new Netty3HttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled, threadContext);
|
||||
serverTransport.dispatchRequest(httpRequest, channel);
|
||||
super.messageReceived(ctx, e);
|
||||
}
|
|
@ -17,15 +17,13 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http.netty;
|
||||
package org.elasticsearch.http.netty3;
|
||||
|
||||
import com.carrotsearch.hppc.IntHashSet;
|
||||
import com.carrotsearch.hppc.IntSet;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.elasticsearch.transport.netty.OpenChannelsHandler;
|
||||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
|
@ -46,16 +44,18 @@ import org.elasticsearch.http.HttpInfo;
|
|||
import org.elasticsearch.http.HttpServerAdapter;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.http.HttpStats;
|
||||
import org.elasticsearch.http.netty.cors.CorsConfig;
|
||||
import org.elasticsearch.http.netty.cors.CorsConfigBuilder;
|
||||
import org.elasticsearch.http.netty.cors.CorsHandler;
|
||||
import org.elasticsearch.http.netty.pipelining.HttpPipeliningHandler;
|
||||
import org.elasticsearch.http.netty3.cors.Netty3CorsConfig;
|
||||
import org.elasticsearch.http.netty3.cors.Netty3CorsConfigBuilder;
|
||||
import org.elasticsearch.http.netty3.cors.Netty3CorsHandler;
|
||||
import org.elasticsearch.http.netty3.pipelining.HttpPipeliningHandler;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.rest.RestChannel;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.support.RestUtils;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BindTransportException;
|
||||
import org.elasticsearch.transport.netty3.Netty3OpenChannelsHandler;
|
||||
import org.elasticsearch.transport.netty3.Netty3Utils;
|
||||
import org.jboss.netty.bootstrap.ServerBootstrap;
|
||||
import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
|
@ -108,12 +108,12 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_
|
|||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
|
||||
import static org.elasticsearch.http.netty.cors.CorsHandler.ANY_ORIGIN;
|
||||
import static org.elasticsearch.http.netty3.cors.Netty3CorsHandler.ANY_ORIGIN;
|
||||
|
||||
public class NettyHttpServerTransport extends AbstractLifecycleComponent implements HttpServerTransport {
|
||||
public class Netty3HttpServerTransport extends AbstractLifecycleComponent implements HttpServerTransport {
|
||||
|
||||
static {
|
||||
NettyUtils.setup();
|
||||
Netty3Utils.setup();
|
||||
}
|
||||
|
||||
public static Setting<ByteSizeValue> SETTING_HTTP_NETTY_MAX_CUMULATION_BUFFER_CAPACITY =
|
||||
|
@ -208,14 +208,14 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent impleme
|
|||
protected volatile List<Channel> serverChannels = new ArrayList<>();
|
||||
|
||||
// package private for testing
|
||||
OpenChannelsHandler serverOpenChannels;
|
||||
Netty3OpenChannelsHandler serverOpenChannels;
|
||||
|
||||
protected volatile HttpServerAdapter httpServerAdapter;
|
||||
|
||||
private final CorsConfig corsConfig;
|
||||
private final Netty3CorsConfig corsConfig;
|
||||
|
||||
@Inject
|
||||
public NettyHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool) {
|
||||
public Netty3HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool) {
|
||||
super(settings);
|
||||
this.networkService = networkService;
|
||||
this.bigArrays = bigArrays;
|
||||
|
@ -280,7 +280,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent impleme
|
|||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
this.serverOpenChannels = new OpenChannelsHandler(logger);
|
||||
this.serverOpenChannels = new Netty3OpenChannelsHandler(logger);
|
||||
if (blockingServer) {
|
||||
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, "http_server_boss")),
|
||||
|
@ -369,22 +369,22 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent impleme
|
|||
return publishPort;
|
||||
}
|
||||
|
||||
private CorsConfig buildCorsConfig(Settings settings) {
|
||||
private Netty3CorsConfig buildCorsConfig(Settings settings) {
|
||||
if (SETTING_CORS_ENABLED.get(settings) == false) {
|
||||
return CorsConfigBuilder.forOrigins().disable().build();
|
||||
return Netty3CorsConfigBuilder.forOrigins().disable().build();
|
||||
}
|
||||
String origin = SETTING_CORS_ALLOW_ORIGIN.get(settings);
|
||||
final CorsConfigBuilder builder;
|
||||
final Netty3CorsConfigBuilder builder;
|
||||
if (Strings.isNullOrEmpty(origin)) {
|
||||
builder = CorsConfigBuilder.forOrigins();
|
||||
builder = Netty3CorsConfigBuilder.forOrigins();
|
||||
} else if (origin.equals(ANY_ORIGIN)) {
|
||||
builder = CorsConfigBuilder.forAnyOrigin();
|
||||
builder = Netty3CorsConfigBuilder.forAnyOrigin();
|
||||
} else {
|
||||
Pattern p = RestUtils.checkCorsSettingForRegex(origin);
|
||||
if (p == null) {
|
||||
builder = CorsConfigBuilder.forOrigins(RestUtils.corsSettingAsArray(origin));
|
||||
builder = Netty3CorsConfigBuilder.forOrigins(RestUtils.corsSettingAsArray(origin));
|
||||
} else {
|
||||
builder = CorsConfigBuilder.forPattern(p);
|
||||
builder = Netty3CorsConfigBuilder.forPattern(p);
|
||||
}
|
||||
}
|
||||
if (SETTING_CORS_ALLOW_CREDENTIALS.get(settings)) {
|
||||
|
@ -473,11 +473,11 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent impleme
|
|||
|
||||
@Override
|
||||
public HttpStats stats() {
|
||||
OpenChannelsHandler channels = serverOpenChannels;
|
||||
Netty3OpenChannelsHandler channels = serverOpenChannels;
|
||||
return new HttpStats(channels == null ? 0 : channels.numberOfOpenChannels(), channels == null ? 0 : channels.totalChannels());
|
||||
}
|
||||
|
||||
public CorsConfig getCorsConfig() {
|
||||
public Netty3CorsConfig getCorsConfig() {
|
||||
return corsConfig;
|
||||
}
|
||||
|
||||
|
@ -512,12 +512,12 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent impleme
|
|||
|
||||
protected static class HttpChannelPipelineFactory implements ChannelPipelineFactory {
|
||||
|
||||
protected final NettyHttpServerTransport transport;
|
||||
protected final HttpRequestHandler requestHandler;
|
||||
protected final Netty3HttpServerTransport transport;
|
||||
protected final Netty3HttpRequestHandler requestHandler;
|
||||
|
||||
public HttpChannelPipelineFactory(NettyHttpServerTransport transport, boolean detailedErrorsEnabled, ThreadContext threadContext) {
|
||||
public HttpChannelPipelineFactory(Netty3HttpServerTransport transport, boolean detailedErrorsEnabled, ThreadContext threadContext) {
|
||||
this.transport = transport;
|
||||
this.requestHandler = new HttpRequestHandler(transport, detailedErrorsEnabled, threadContext);
|
||||
this.requestHandler = new Netty3HttpRequestHandler(transport, detailedErrorsEnabled, threadContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -546,12 +546,12 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent impleme
|
|||
httpChunkAggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
|
||||
}
|
||||
pipeline.addLast("aggregator", httpChunkAggregator);
|
||||
pipeline.addLast("encoder", new ESHttpResponseEncoder());
|
||||
pipeline.addLast("encoder", new ESNetty3HttpResponseEncoder());
|
||||
if (transport.compression) {
|
||||
pipeline.addLast("encoder_compress", new HttpContentCompressor(transport.compressionLevel));
|
||||
}
|
||||
if (SETTING_CORS_ENABLED.get(transport.settings())) {
|
||||
pipeline.addLast("cors", new CorsHandler(transport.getCorsConfig()));
|
||||
pipeline.addLast("cors", new Netty3CorsHandler(transport.getCorsConfig()));
|
||||
}
|
||||
if (transport.pipelining) {
|
||||
pipeline.addLast("pipelining", new HttpPipeliningHandler(transport.pipeliningMaxEvents));
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http.netty.cors;
|
||||
package org.elasticsearch.http.netty3.cors;
|
||||
|
||||
import org.jboss.netty.handler.codec.http.DefaultHttpHeaders;
|
||||
import org.jboss.netty.handler.codec.http.HttpHeaders;
|
||||
|
@ -37,7 +37,7 @@ import java.util.regex.Pattern;
|
|||
* This class was lifted from the Netty project:
|
||||
* https://github.com/netty/netty
|
||||
*/
|
||||
public final class CorsConfig {
|
||||
public final class Netty3CorsConfig {
|
||||
|
||||
private final Optional<Set<String>> origins;
|
||||
private final Optional<Pattern> pattern;
|
||||
|
@ -51,7 +51,7 @@ public final class CorsConfig {
|
|||
private final Map<CharSequence, Callable<?>> preflightHeaders;
|
||||
private final boolean shortCircuit;
|
||||
|
||||
CorsConfig(final CorsConfigBuilder builder) {
|
||||
Netty3CorsConfig(final Netty3CorsConfigBuilder builder) {
|
||||
origins = builder.origins.map(s -> new LinkedHashSet<>(s));
|
||||
pattern = builder.pattern;
|
||||
anyOrigin = builder.anyOrigin;
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http.netty.cors;
|
||||
package org.elasticsearch.http.netty3.cors;
|
||||
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
|
||||
|
@ -33,55 +33,55 @@ import java.util.concurrent.Callable;
|
|||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Builder used to configure and build a {@link CorsConfig} instance.
|
||||
* Builder used to configure and build a {@link Netty3CorsConfig} instance.
|
||||
*
|
||||
* This class was lifted from the Netty project:
|
||||
* https://github.com/netty/netty
|
||||
*/
|
||||
public final class CorsConfigBuilder {
|
||||
public final class Netty3CorsConfigBuilder {
|
||||
|
||||
/**
|
||||
* Creates a Builder instance with it's origin set to '*'.
|
||||
*
|
||||
* @return Builder to support method chaining.
|
||||
*/
|
||||
public static CorsConfigBuilder forAnyOrigin() {
|
||||
return new CorsConfigBuilder();
|
||||
public static Netty3CorsConfigBuilder forAnyOrigin() {
|
||||
return new Netty3CorsConfigBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@link CorsConfigBuilder} instance with the specified origin.
|
||||
* Creates a {@link Netty3CorsConfigBuilder} instance with the specified origin.
|
||||
*
|
||||
* @return {@link CorsConfigBuilder} to support method chaining.
|
||||
* @return {@link Netty3CorsConfigBuilder} to support method chaining.
|
||||
*/
|
||||
public static CorsConfigBuilder forOrigin(final String origin) {
|
||||
public static Netty3CorsConfigBuilder forOrigin(final String origin) {
|
||||
if ("*".equals(origin)) {
|
||||
return new CorsConfigBuilder();
|
||||
return new Netty3CorsConfigBuilder();
|
||||
}
|
||||
return new CorsConfigBuilder(origin);
|
||||
return new Netty3CorsConfigBuilder(origin);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a {@link CorsConfigBuilder} instance with the specified pattern origin.
|
||||
* Create a {@link Netty3CorsConfigBuilder} instance with the specified pattern origin.
|
||||
*
|
||||
* @param pattern the regular expression pattern to match incoming origins on.
|
||||
* @return {@link CorsConfigBuilder} with the configured origin pattern.
|
||||
* @return {@link Netty3CorsConfigBuilder} with the configured origin pattern.
|
||||
*/
|
||||
public static CorsConfigBuilder forPattern(final Pattern pattern) {
|
||||
public static Netty3CorsConfigBuilder forPattern(final Pattern pattern) {
|
||||
if (pattern == null) {
|
||||
throw new IllegalArgumentException("CORS pattern cannot be null");
|
||||
}
|
||||
return new CorsConfigBuilder(pattern);
|
||||
return new Netty3CorsConfigBuilder(pattern);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@link CorsConfigBuilder} instance with the specified origins.
|
||||
* Creates a {@link Netty3CorsConfigBuilder} instance with the specified origins.
|
||||
*
|
||||
* @return {@link CorsConfigBuilder} to support method chaining.
|
||||
* @return {@link Netty3CorsConfigBuilder} to support method chaining.
|
||||
*/
|
||||
public static CorsConfigBuilder forOrigins(final String... origins) {
|
||||
return new CorsConfigBuilder(origins);
|
||||
public static Netty3CorsConfigBuilder forOrigins(final String... origins) {
|
||||
return new Netty3CorsConfigBuilder(origins);
|
||||
}
|
||||
|
||||
Optional<Set<String>> origins;
|
||||
|
@ -102,7 +102,7 @@ public final class CorsConfigBuilder {
|
|||
*
|
||||
* @param origins the origin to be used for this builder.
|
||||
*/
|
||||
CorsConfigBuilder(final String... origins) {
|
||||
Netty3CorsConfigBuilder(final String... origins) {
|
||||
this.origins = Optional.of(new LinkedHashSet<>(Arrays.asList(origins)));
|
||||
pattern = Optional.empty();
|
||||
anyOrigin = false;
|
||||
|
@ -113,7 +113,7 @@ public final class CorsConfigBuilder {
|
|||
* wildcard origin.
|
||||
*
|
||||
*/
|
||||
CorsConfigBuilder() {
|
||||
Netty3CorsConfigBuilder() {
|
||||
anyOrigin = true;
|
||||
origins = Optional.empty();
|
||||
pattern = Optional.empty();
|
||||
|
@ -124,7 +124,7 @@ public final class CorsConfigBuilder {
|
|||
*
|
||||
* @param pattern the pattern to match against for incoming origins.
|
||||
*/
|
||||
CorsConfigBuilder(final Pattern pattern) {
|
||||
Netty3CorsConfigBuilder(final Pattern pattern) {
|
||||
this.pattern = Optional.of(pattern);
|
||||
origins = Optional.empty();
|
||||
anyOrigin = false;
|
||||
|
@ -135,9 +135,9 @@ public final class CorsConfigBuilder {
|
|||
* from the local file system. Calling this method will enable a successful CORS response
|
||||
* with a wildcard for the CORS response header 'Access-Control-Allow-Origin'.
|
||||
*
|
||||
* @return {@link CorsConfigBuilder} to support method chaining.
|
||||
* @return {@link Netty3CorsConfigBuilder} to support method chaining.
|
||||
*/
|
||||
CorsConfigBuilder allowNullOrigin() {
|
||||
Netty3CorsConfigBuilder allowNullOrigin() {
|
||||
allowNullOrigin = true;
|
||||
return this;
|
||||
}
|
||||
|
@ -145,9 +145,9 @@ public final class CorsConfigBuilder {
|
|||
/**
|
||||
* Disables CORS support.
|
||||
*
|
||||
* @return {@link CorsConfigBuilder} to support method chaining.
|
||||
* @return {@link Netty3CorsConfigBuilder} to support method chaining.
|
||||
*/
|
||||
public CorsConfigBuilder disable() {
|
||||
public Netty3CorsConfigBuilder disable() {
|
||||
enabled = false;
|
||||
return this;
|
||||
}
|
||||
|
@ -165,9 +165,9 @@ public final class CorsConfigBuilder {
|
|||
* The default value for 'withCredentials' is false in which case no cookies are sent.
|
||||
* Setting this to true will included cookies in cross origin requests.
|
||||
*
|
||||
* @return {@link CorsConfigBuilder} to support method chaining.
|
||||
* @return {@link Netty3CorsConfigBuilder} to support method chaining.
|
||||
*/
|
||||
public CorsConfigBuilder allowCredentials() {
|
||||
public Netty3CorsConfigBuilder allowCredentials() {
|
||||
allowCredentials = true;
|
||||
return this;
|
||||
}
|
||||
|
@ -179,9 +179,9 @@ public final class CorsConfigBuilder {
|
|||
* request will be made.
|
||||
*
|
||||
* @param max the maximum time, in seconds, that the preflight response may be cached.
|
||||
* @return {@link CorsConfigBuilder} to support method chaining.
|
||||
* @return {@link Netty3CorsConfigBuilder} to support method chaining.
|
||||
*/
|
||||
public CorsConfigBuilder maxAge(final long max) {
|
||||
public Netty3CorsConfigBuilder maxAge(final long max) {
|
||||
maxAge = max;
|
||||
return this;
|
||||
}
|
||||
|
@ -191,9 +191,9 @@ public final class CorsConfigBuilder {
|
|||
* CORS 'Access-Control-Request-Method' response header.
|
||||
*
|
||||
* @param methods the {@link HttpMethod}s that should be allowed.
|
||||
* @return {@link CorsConfigBuilder} to support method chaining.
|
||||
* @return {@link Netty3CorsConfigBuilder} to support method chaining.
|
||||
*/
|
||||
public CorsConfigBuilder allowedRequestMethods(final HttpMethod... methods) {
|
||||
public Netty3CorsConfigBuilder allowedRequestMethods(final HttpMethod... methods) {
|
||||
requestMethods.addAll(Arrays.asList(methods));
|
||||
return this;
|
||||
}
|
||||
|
@ -212,9 +212,9 @@ public final class CorsConfigBuilder {
|
|||
* if it allow a request).
|
||||
*
|
||||
* @param headers the headers to be added to the preflight 'Access-Control-Allow-Headers' response header.
|
||||
* @return {@link CorsConfigBuilder} to support method chaining.
|
||||
* @return {@link Netty3CorsConfigBuilder} to support method chaining.
|
||||
*/
|
||||
public CorsConfigBuilder allowedRequestHeaders(final String... headers) {
|
||||
public Netty3CorsConfigBuilder allowedRequestHeaders(final String... headers) {
|
||||
requestHeaders.addAll(Arrays.asList(headers));
|
||||
return this;
|
||||
}
|
||||
|
@ -227,9 +227,9 @@ public final class CorsConfigBuilder {
|
|||
*
|
||||
* @param name the name of the HTTP header.
|
||||
* @param values the values for the HTTP header.
|
||||
* @return {@link CorsConfigBuilder} to support method chaining.
|
||||
* @return {@link Netty3CorsConfigBuilder} to support method chaining.
|
||||
*/
|
||||
public CorsConfigBuilder preflightResponseHeader(final CharSequence name, final Object... values) {
|
||||
public Netty3CorsConfigBuilder preflightResponseHeader(final CharSequence name, final Object... values) {
|
||||
if (values.length == 1) {
|
||||
preflightHeaders.put(name, new ConstantValueGenerator(values[0]));
|
||||
} else {
|
||||
|
@ -247,9 +247,9 @@ public final class CorsConfigBuilder {
|
|||
* @param name the name of the HTTP header.
|
||||
* @param value the values for the HTTP header.
|
||||
* @param <T> the type of values that the Iterable contains.
|
||||
* @return {@link CorsConfigBuilder} to support method chaining.
|
||||
* @return {@link Netty3CorsConfigBuilder} to support method chaining.
|
||||
*/
|
||||
public <T> CorsConfigBuilder preflightResponseHeader(final CharSequence name, final Iterable<T> value) {
|
||||
public <T> Netty3CorsConfigBuilder preflightResponseHeader(final CharSequence name, final Iterable<T> value) {
|
||||
preflightHeaders.put(name, new ConstantValueGenerator(value));
|
||||
return this;
|
||||
}
|
||||
|
@ -267,9 +267,9 @@ public final class CorsConfigBuilder {
|
|||
* @param name the name of the HTTP header.
|
||||
* @param valueGenerator a Callable which will be invoked at HTTP response creation.
|
||||
* @param <T> the type of the value that the Callable can return.
|
||||
* @return {@link CorsConfigBuilder} to support method chaining.
|
||||
* @return {@link Netty3CorsConfigBuilder} to support method chaining.
|
||||
*/
|
||||
public <T> CorsConfigBuilder preflightResponseHeader(final CharSequence name, final Callable<T> valueGenerator) {
|
||||
public <T> Netty3CorsConfigBuilder preflightResponseHeader(final CharSequence name, final Callable<T> valueGenerator) {
|
||||
preflightHeaders.put(name, valueGenerator);
|
||||
return this;
|
||||
}
|
||||
|
@ -277,9 +277,9 @@ public final class CorsConfigBuilder {
|
|||
/**
|
||||
* Specifies that no preflight response headers should be added to a preflight response.
|
||||
*
|
||||
* @return {@link CorsConfigBuilder} to support method chaining.
|
||||
* @return {@link Netty3CorsConfigBuilder} to support method chaining.
|
||||
*/
|
||||
public CorsConfigBuilder noPreflightResponseHeaders() {
|
||||
public Netty3CorsConfigBuilder noPreflightResponseHeaders() {
|
||||
noPreflightHeaders = true;
|
||||
return this;
|
||||
}
|
||||
|
@ -292,24 +292,24 @@ public final class CorsConfigBuilder {
|
|||
* and this setting will check that the Origin is valid and if it is not valid no
|
||||
* further processing will take place, and a error will be returned to the calling client.
|
||||
*
|
||||
* @return {@link CorsConfigBuilder} to support method chaining.
|
||||
* @return {@link Netty3CorsConfigBuilder} to support method chaining.
|
||||
*/
|
||||
public CorsConfigBuilder shortCircuit() {
|
||||
public Netty3CorsConfigBuilder shortCircuit() {
|
||||
shortCircuit = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a {@link CorsConfig} with settings specified by previous method calls.
|
||||
* Builds a {@link Netty3CorsConfig} with settings specified by previous method calls.
|
||||
*
|
||||
* @return {@link CorsConfig} the configured CorsConfig instance.
|
||||
* @return {@link Netty3CorsConfig} the configured CorsConfig instance.
|
||||
*/
|
||||
public CorsConfig build() {
|
||||
public Netty3CorsConfig build() {
|
||||
if (preflightHeaders.isEmpty() && !noPreflightHeaders) {
|
||||
preflightHeaders.put("date", DateValueGenerator.INSTANCE);
|
||||
preflightHeaders.put("content-length", new ConstantValueGenerator("0"));
|
||||
}
|
||||
return new CorsConfig(this);
|
||||
return new Netty3CorsConfig(this);
|
||||
}
|
||||
|
||||
/**
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http.netty.cors;
|
||||
package org.elasticsearch.http.netty3.cors;
|
||||
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.jboss.netty.channel.ChannelFutureListener;
|
||||
|
@ -47,23 +47,23 @@ import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
|
|||
/**
|
||||
* Handles <a href="http://www.w3.org/TR/cors/">Cross Origin Resource Sharing</a> (CORS) requests.
|
||||
* <p>
|
||||
* This handler can be configured using a {@link CorsConfig}, please
|
||||
* This handler can be configured using a {@link Netty3CorsConfig}, please
|
||||
* refer to this class for details about the configuration options available.
|
||||
*
|
||||
* This code was borrowed from Netty 4 and refactored to work for Elasticsearch's Netty 3 setup.
|
||||
*/
|
||||
public class CorsHandler extends SimpleChannelUpstreamHandler {
|
||||
public class Netty3CorsHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
public static final String ANY_ORIGIN = "*";
|
||||
private static Pattern SCHEME_PATTERN = Pattern.compile("^https?://");
|
||||
|
||||
private final CorsConfig config;
|
||||
private final Netty3CorsConfig config;
|
||||
private HttpRequest request;
|
||||
|
||||
/**
|
||||
* Creates a new instance with the specified {@link CorsConfig}.
|
||||
* Creates a new instance with the specified {@link Netty3CorsConfig}.
|
||||
*/
|
||||
public CorsHandler(final CorsConfig config) {
|
||||
public Netty3CorsHandler(final Netty3CorsConfig config) {
|
||||
if (config == null) {
|
||||
throw new IllegalArgumentException("Config cannot be null");
|
||||
}
|
||||
|
@ -86,7 +86,7 @@ public class CorsHandler extends SimpleChannelUpstreamHandler {
|
|||
super.messageReceived(ctx, e);
|
||||
}
|
||||
|
||||
public static void setCorsResponseHeaders(HttpRequest request, HttpResponse resp, CorsConfig config) {
|
||||
public static void setCorsResponseHeaders(HttpRequest request, HttpResponse resp, Netty3CorsConfig config) {
|
||||
if (!config.isCorsSupportEnabled()) {
|
||||
return;
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package org.elasticsearch.http.netty.pipelining;
|
||||
package org.elasticsearch.http.netty3.pipelining;
|
||||
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
|
@ -1,4 +1,4 @@
|
|||
package org.elasticsearch.http.netty.pipelining;
|
||||
package org.elasticsearch.http.netty3.pipelining;
|
||||
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
|
@ -1,4 +1,4 @@
|
|||
package org.elasticsearch.http.netty.pipelining;
|
||||
package org.elasticsearch.http.netty3.pipelining;
|
||||
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
|
@ -23,20 +23,20 @@ import org.elasticsearch.SpecialPermission;
|
|||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.http.netty.NettyHttpServerTransport;
|
||||
import org.elasticsearch.http.netty3.Netty3HttpServerTransport;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
import org.elasticsearch.transport.netty3.Netty3Transport;
|
||||
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class NettyPlugin extends Plugin {
|
||||
public static final String NETTY_TRANSPORT_NAME = "netty";
|
||||
public static final String NETTY_HTTP_TRANSPORT_NAME = "netty";
|
||||
public class Netty3Plugin extends Plugin {
|
||||
public static final String NETTY_TRANSPORT_NAME = "netty3";
|
||||
public static final String NETTY_HTTP_TRANSPORT_NAME = "netty3";
|
||||
|
||||
public NettyPlugin(Settings settings) {
|
||||
public Netty3Plugin(Settings settings) {
|
||||
SecurityManager sm = System.getSecurityManager();
|
||||
if (sm != null) {
|
||||
sm.checkPermission(new SpecialPermission());
|
||||
|
@ -64,29 +64,29 @@ public class NettyPlugin extends Plugin {
|
|||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Arrays.asList(
|
||||
NettyHttpServerTransport.SETTING_HTTP_NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
|
||||
NettyHttpServerTransport.SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
|
||||
NettyHttpServerTransport.SETTING_HTTP_WORKER_COUNT,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_NO_DELAY,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_KEEP_ALIVE,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_BLOCKING_SERVER,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_REUSE_ADDRESS,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_SEND_BUFFER_SIZE,
|
||||
NettyHttpServerTransport.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE,
|
||||
NettyTransport.WORKER_COUNT,
|
||||
NettyTransport.NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
|
||||
NettyTransport.NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
|
||||
NettyTransport.NETTY_RECEIVE_PREDICTOR_SIZE,
|
||||
NettyTransport.NETTY_RECEIVE_PREDICTOR_MIN,
|
||||
NettyTransport.NETTY_RECEIVE_PREDICTOR_MAX,
|
||||
NettyTransport.NETTY_BOSS_COUNT
|
||||
Netty3HttpServerTransport.SETTING_HTTP_NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
|
||||
Netty3HttpServerTransport.SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
|
||||
Netty3HttpServerTransport.SETTING_HTTP_WORKER_COUNT,
|
||||
Netty3HttpServerTransport.SETTING_HTTP_TCP_NO_DELAY,
|
||||
Netty3HttpServerTransport.SETTING_HTTP_TCP_KEEP_ALIVE,
|
||||
Netty3HttpServerTransport.SETTING_HTTP_TCP_BLOCKING_SERVER,
|
||||
Netty3HttpServerTransport.SETTING_HTTP_TCP_REUSE_ADDRESS,
|
||||
Netty3HttpServerTransport.SETTING_HTTP_TCP_SEND_BUFFER_SIZE,
|
||||
Netty3HttpServerTransport.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE,
|
||||
Netty3Transport.WORKER_COUNT,
|
||||
Netty3Transport.NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
|
||||
Netty3Transport.NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
|
||||
Netty3Transport.NETTY_RECEIVE_PREDICTOR_SIZE,
|
||||
Netty3Transport.NETTY_RECEIVE_PREDICTOR_MIN,
|
||||
Netty3Transport.NETTY_RECEIVE_PREDICTOR_MAX,
|
||||
Netty3Transport.NETTY_BOSS_COUNT
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings additionalSettings() {
|
||||
return Settings.builder()
|
||||
// here we set the netty transport and http transport as the default. This is a set once setting
|
||||
// here we set the netty3 transport and http transport as the default. This is a set once setting
|
||||
// ie. if another plugin does that as well the server will fail - only one default network can exist!
|
||||
.put(NetworkModule.HTTP_DEFAULT_TYPE_SETTING.getKey(), NETTY_HTTP_TRANSPORT_NAME)
|
||||
.put(NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING.getKey(), NETTY_TRANSPORT_NAME)
|
||||
|
@ -95,8 +95,8 @@ public class NettyPlugin extends Plugin {
|
|||
|
||||
public void onModule(NetworkModule networkModule) {
|
||||
if (networkModule.canRegisterHttpExtensions()) {
|
||||
networkModule.registerHttpTransport(NETTY_HTTP_TRANSPORT_NAME, NettyHttpServerTransport.class);
|
||||
networkModule.registerHttpTransport(NETTY_HTTP_TRANSPORT_NAME, Netty3HttpServerTransport.class);
|
||||
}
|
||||
networkModule.registerTransport(NETTY_TRANSPORT_NAME, NettyTransport.class);
|
||||
networkModule.registerTransport(NETTY_TRANSPORT_NAME, Netty3Transport.class);
|
||||
}
|
||||
}
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -47,7 +47,7 @@ class ChannelBufferStreamInput extends StreamInput {
|
|||
|
||||
@Override
|
||||
public BytesReference readBytesReference(int length) throws IOException {
|
||||
BytesReference ref = NettyUtils.toBytesReference(buffer.slice(buffer.readerIndex(), length));
|
||||
BytesReference ref = Netty3Utils.toBytesReference(buffer.slice(buffer.readerIndex(), length));
|
||||
buffer.skipBytes(length);
|
||||
return ref;
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.elasticsearch.common.SuppressLoggerChecks;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
|
@ -27,11 +27,11 @@ import org.jboss.netty.logging.AbstractInternalLogger;
|
|||
*
|
||||
*/
|
||||
@SuppressLoggerChecks(reason = "safely delegates to logger")
|
||||
final class NettyInternalESLogger extends AbstractInternalLogger {
|
||||
final class Netty3InternalESLogger extends AbstractInternalLogger {
|
||||
|
||||
private final ESLogger logger;
|
||||
|
||||
NettyInternalESLogger(ESLogger logger) {
|
||||
Netty3InternalESLogger(ESLogger logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
|
@ -17,12 +17,10 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.transport.TcpHeader;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TcpTransportChannel;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.Transports;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
@ -38,13 +36,13 @@ import java.net.InetSocketAddress;
|
|||
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
|
||||
* to the relevant action.
|
||||
*/
|
||||
class NettyMessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
class Netty3MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
protected final TransportServiceAdapter transportServiceAdapter;
|
||||
protected final NettyTransport transport;
|
||||
protected final Netty3Transport transport;
|
||||
protected final String profileName;
|
||||
|
||||
NettyMessageChannelHandler(NettyTransport transport, String profileName) {
|
||||
Netty3MessageChannelHandler(Netty3Transport transport, String profileName) {
|
||||
this.transportServiceAdapter = transport.transportServiceAdapter();
|
||||
this.transport = transport;
|
||||
this.profileName = profileName;
|
||||
|
@ -71,7 +69,7 @@ class NettyMessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
try {
|
||||
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
|
||||
// buffer, or in the cumulation buffer, which is cleaned each time so it could be bigger than the actual size
|
||||
BytesReference reference = NettyUtils.toBytesReference(buffer, remainingMessageSize);
|
||||
BytesReference reference = Netty3Utils.toBytesReference(buffer, remainingMessageSize);
|
||||
transport.messageReceived(reference, ctx.getChannel(), profileName, remoteAddress, remainingMessageSize);
|
||||
} finally {
|
||||
// Set the expected position of the buffer, no matter what happened
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
|
@ -33,14 +33,10 @@ import org.jboss.netty.channel.ChannelState;
|
|||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.ChannelUpstreamHandler;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@ChannelHandler.Sharable
|
||||
public class OpenChannelsHandler implements ChannelUpstreamHandler, Releasable {
|
||||
public class Netty3OpenChannelsHandler implements ChannelUpstreamHandler, Releasable {
|
||||
|
||||
final Set<Channel> openChannels = ConcurrentCollections.newConcurrentSet();
|
||||
final CounterMetric openChannelsMetric = new CounterMetric();
|
||||
|
@ -48,7 +44,7 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler, Releasable {
|
|||
|
||||
final ESLogger logger;
|
||||
|
||||
public OpenChannelsHandler(ESLogger logger) {
|
||||
public Netty3OpenChannelsHandler(ESLogger logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.elasticsearch.transport.TcpHeader;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
|
@ -27,14 +27,12 @@ import org.jboss.netty.channel.ChannelHandlerContext;
|
|||
import org.jboss.netty.handler.codec.frame.FrameDecoder;
|
||||
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
|
||||
|
||||
/**
|
||||
*/
|
||||
final class SizeHeaderFrameDecoder extends FrameDecoder {
|
||||
final class Netty3SizeHeaderFrameDecoder extends FrameDecoder {
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||
try {
|
||||
boolean continueProcessing = TcpTransport.validateMessageHeader(NettyUtils.toBytesReference(buffer));
|
||||
boolean continueProcessing = TcpTransport.validateMessageHeader(Netty3Utils.toBytesReference(buffer));
|
||||
buffer.skipBytes(TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE);
|
||||
return continueProcessing ? buffer : null;
|
||||
} catch (IllegalArgumentException ex) {
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -84,10 +84,10 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
|
|||
* longer. Med is for the typical search / single doc index. And High for things like cluster state. Ping is reserved for
|
||||
* sending out ping requests to other nodes.
|
||||
*/
|
||||
public class NettyTransport extends TcpTransport<Channel> {
|
||||
public class Netty3Transport extends TcpTransport<Channel> {
|
||||
|
||||
static {
|
||||
NettyUtils.setup();
|
||||
Netty3Utils.setup();
|
||||
}
|
||||
|
||||
public static final Setting<Integer> WORKER_COUNT =
|
||||
|
@ -127,14 +127,14 @@ public class NettyTransport extends TcpTransport<Channel> {
|
|||
protected final ByteSizeValue receivePredictorMin;
|
||||
protected final ByteSizeValue receivePredictorMax;
|
||||
// package private for testing
|
||||
volatile OpenChannelsHandler serverOpenChannels;
|
||||
volatile Netty3OpenChannelsHandler serverOpenChannels;
|
||||
protected volatile ClientBootstrap clientBootstrap;
|
||||
protected final Map<String, ServerBootstrap> serverBootstraps = newConcurrentMap();
|
||||
|
||||
@Inject
|
||||
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
|
||||
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
super("netty", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
|
||||
public Netty3Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
|
||||
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
super("netty3", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
|
||||
this.workerCount = WORKER_COUNT.get(settings);
|
||||
this.maxCumulationBufferCapacity = NETTY_MAX_CUMULATION_BUFFER_CAPACITY.get(settings);
|
||||
this.maxCompositeBufferComponents = NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
|
||||
|
@ -160,7 +160,7 @@ public class NettyTransport extends TcpTransport<Channel> {
|
|||
try {
|
||||
clientBootstrap = createClientBootstrap();
|
||||
if (NetworkService.NETWORK_SERVER.get(settings)) {
|
||||
final OpenChannelsHandler openChannels = new OpenChannelsHandler(logger);
|
||||
final Netty3OpenChannelsHandler openChannels = new Netty3OpenChannelsHandler(logger);
|
||||
this.serverOpenChannels = openChannels;
|
||||
// loop through all profiles and start them up, special handling for default one
|
||||
for (Map.Entry<String, Settings> entry : buildProfileSettings().entrySet()) {
|
||||
|
@ -322,7 +322,7 @@ public class NettyTransport extends TcpTransport<Channel> {
|
|||
|
||||
@Override
|
||||
public long serverOpen() {
|
||||
OpenChannelsHandler channels = serverOpenChannels;
|
||||
Netty3OpenChannelsHandler channels = serverOpenChannels;
|
||||
return channels == null ? 0 : channels.numberOfOpenChannels();
|
||||
}
|
||||
|
||||
|
@ -402,16 +402,16 @@ public class NettyTransport extends TcpTransport<Channel> {
|
|||
}
|
||||
|
||||
protected static class ClientChannelPipelineFactory implements ChannelPipelineFactory {
|
||||
protected final NettyTransport nettyTransport;
|
||||
protected final Netty3Transport nettyTransport;
|
||||
|
||||
public ClientChannelPipelineFactory(NettyTransport nettyTransport) {
|
||||
public ClientChannelPipelineFactory(Netty3Transport nettyTransport) {
|
||||
this.nettyTransport = nettyTransport;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
ChannelPipeline channelPipeline = Channels.pipeline();
|
||||
SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
|
||||
Netty3SizeHeaderFrameDecoder sizeHeader = new Netty3SizeHeaderFrameDecoder();
|
||||
if (nettyTransport.maxCumulationBufferCapacity.bytes() >= 0) {
|
||||
if (nettyTransport.maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
|
||||
sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
|
||||
|
@ -424,7 +424,7 @@ public class NettyTransport extends TcpTransport<Channel> {
|
|||
}
|
||||
channelPipeline.addLast("size", sizeHeader);
|
||||
// using a dot as a prefix means, this cannot come from any settings parsed
|
||||
channelPipeline.addLast("dispatcher", new NettyMessageChannelHandler(nettyTransport, ".client"));
|
||||
channelPipeline.addLast("dispatcher", new Netty3MessageChannelHandler(nettyTransport, ".client"));
|
||||
return channelPipeline;
|
||||
}
|
||||
}
|
||||
|
@ -435,11 +435,11 @@ public class NettyTransport extends TcpTransport<Channel> {
|
|||
|
||||
protected static class ServerChannelPipelineFactory implements ChannelPipelineFactory {
|
||||
|
||||
protected final NettyTransport nettyTransport;
|
||||
protected final Netty3Transport nettyTransport;
|
||||
protected final String name;
|
||||
protected final Settings settings;
|
||||
|
||||
public ServerChannelPipelineFactory(NettyTransport nettyTransport, String name, Settings settings) {
|
||||
public ServerChannelPipelineFactory(Netty3Transport nettyTransport, String name, Settings settings) {
|
||||
this.nettyTransport = nettyTransport;
|
||||
this.name = name;
|
||||
this.settings = settings;
|
||||
|
@ -449,7 +449,7 @@ public class NettyTransport extends TcpTransport<Channel> {
|
|||
public ChannelPipeline getPipeline() throws Exception {
|
||||
ChannelPipeline channelPipeline = Channels.pipeline();
|
||||
channelPipeline.addLast("openChannels", nettyTransport.serverOpenChannels);
|
||||
SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
|
||||
Netty3SizeHeaderFrameDecoder sizeHeader = new Netty3SizeHeaderFrameDecoder();
|
||||
if (nettyTransport.maxCumulationBufferCapacity.bytes() > 0) {
|
||||
if (nettyTransport.maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
|
||||
sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
|
||||
|
@ -461,7 +461,7 @@ public class NettyTransport extends TcpTransport<Channel> {
|
|||
sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents);
|
||||
}
|
||||
channelPipeline.addLast("size", sizeHeader);
|
||||
channelPipeline.addLast("dispatcher", new NettyMessageChannelHandler(nettyTransport, name));
|
||||
channelPipeline.addLast("dispatcher", new Netty3MessageChannelHandler(nettyTransport, name));
|
||||
return channelPipeline;
|
||||
}
|
||||
}
|
||||
|
@ -487,7 +487,7 @@ public class NettyTransport extends TcpTransport<Channel> {
|
|||
|
||||
@Override
|
||||
protected void sendMessage(Channel channel, BytesReference reference, Runnable sendListener, boolean close) {
|
||||
final ChannelFuture future = channel.write(NettyUtils.toChannelBuffer(reference));
|
||||
final ChannelFuture future = channel.write(Netty3Utils.toChannelBuffer(reference));
|
||||
if (close) {
|
||||
future.addListener(f -> {
|
||||
try {
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
|
@ -34,7 +34,7 @@ import java.util.ArrayList;
|
|||
|
||||
/**
|
||||
*/
|
||||
public class NettyUtils {
|
||||
public class Netty3Utils {
|
||||
|
||||
/**
|
||||
* Here we go....
|
||||
|
@ -96,8 +96,8 @@ public class NettyUtils {
|
|||
InternalLoggerFactory.setDefaultFactory(new InternalLoggerFactory() {
|
||||
@Override
|
||||
public InternalLogger newInstance(String name) {
|
||||
name = name.replace("org.jboss.netty.", "netty.").replace("org.jboss.netty.", "netty.");
|
||||
return new NettyInternalESLogger(Loggers.getLogger(name));
|
||||
name = name.replace("org.jboss.netty.", "netty3.").replace("org.jboss.netty.", "netty3.");
|
||||
return new Netty3InternalESLogger(Loggers.getLogger(name));
|
||||
}
|
||||
});
|
||||
|
|
@ -22,13 +22,13 @@ import org.elasticsearch.common.network.NetworkModule;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.transport.NettyPlugin;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
import org.elasticsearch.transport.Netty3Plugin;
|
||||
import org.elasticsearch.transport.netty3.Netty3Transport;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
@ESIntegTestCase.SuppressLocalMode
|
||||
public abstract class ESNettyIntegTestCase extends ESIntegTestCase {
|
||||
public abstract class ESNetty3IntegTestCase extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected boolean ignoreExternalCluster() {
|
||||
|
@ -45,28 +45,28 @@ public abstract class ESNettyIntegTestCase extends ESIntegTestCase {
|
|||
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal));
|
||||
// randomize netty settings
|
||||
if (randomBoolean()) {
|
||||
builder.put(NettyTransport.WORKER_COUNT.getKey(), random().nextInt(3) + 1);
|
||||
builder.put(Netty3Transport.WORKER_COUNT.getKey(), random().nextInt(3) + 1);
|
||||
}
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty");
|
||||
builder.put(NetworkModule.HTTP_TYPE_KEY, "netty");
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty3");
|
||||
builder.put(NetworkModule.HTTP_TYPE_KEY, "netty3");
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings transportClientSettings() {
|
||||
Settings.Builder builder = Settings.builder().put(super.transportClientSettings());
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty");
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty3");
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(NettyPlugin.class);
|
||||
return pluginList(Netty3Plugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
|
||||
return pluginList(NettyPlugin.class);
|
||||
return pluginList(Netty3Plugin.class);
|
||||
}
|
||||
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http.netty;
|
||||
package org.elasticsearch.http.netty3;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -25,7 +25,7 @@ import org.elasticsearch.common.network.NetworkService;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.http.HttpTransportSettings;
|
||||
import org.elasticsearch.http.netty.cors.CorsHandler;
|
||||
import org.elasticsearch.http.netty3.cors.Netty3CorsHandler;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.rest.RestResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
@ -61,12 +61,12 @@ import static org.hamcrest.Matchers.is;
|
|||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class NettyHttpChannelTests extends ESTestCase {
|
||||
public class Netty3HttpChannelTests extends ESTestCase {
|
||||
|
||||
private NetworkService networkService;
|
||||
private ThreadPool threadPool;
|
||||
private MockBigArrays bigArrays;
|
||||
private NettyHttpServerTransport httpServerTransport;
|
||||
private Netty3HttpServerTransport httpServerTransport;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
|
@ -159,7 +159,7 @@ public class NettyHttpChannelTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testThatAnyOriginWorks() {
|
||||
final String originValue = CorsHandler.ANY_ORIGIN;
|
||||
final String originValue = Netty3CorsHandler.ANY_ORIGIN;
|
||||
Settings settings = Settings.builder()
|
||||
.put(SETTING_CORS_ENABLED.getKey(), true)
|
||||
.put(SETTING_CORS_ALLOW_ORIGIN.getKey(), originValue)
|
||||
|
@ -174,14 +174,15 @@ public class NettyHttpChannelTests extends ESTestCase {
|
|||
|
||||
public void testHeadersSet() {
|
||||
Settings settings = Settings.builder().build();
|
||||
httpServerTransport = new NettyHttpServerTransport(settings, networkService, bigArrays, threadPool);
|
||||
httpServerTransport = new Netty3HttpServerTransport(settings, networkService, bigArrays, threadPool);
|
||||
HttpRequest httpRequest = new TestHttpRequest();
|
||||
httpRequest.headers().add(HttpHeaders.Names.ORIGIN, "remote");
|
||||
WriteCapturingChannel writeCapturingChannel = new WriteCapturingChannel();
|
||||
NettyHttpRequest request = new NettyHttpRequest(httpRequest, writeCapturingChannel);
|
||||
Netty3HttpRequest request = new Netty3HttpRequest(httpRequest, writeCapturingChannel);
|
||||
|
||||
// send a response
|
||||
NettyHttpChannel channel = new NettyHttpChannel(httpServerTransport, request, null, randomBoolean(), threadPool.getThreadContext());
|
||||
Netty3HttpChannel channel =
|
||||
new Netty3HttpChannel(httpServerTransport, request, null, randomBoolean(), threadPool.getThreadContext());
|
||||
TestReponse resp = new TestReponse();
|
||||
final String customHeader = "custom-header";
|
||||
final String customHeaderValue = "xyz";
|
||||
|
@ -200,14 +201,15 @@ public class NettyHttpChannelTests extends ESTestCase {
|
|||
|
||||
private HttpResponse execRequestWithCors(final Settings settings, final String originValue, final String host) {
|
||||
// construct request and send it over the transport layer
|
||||
httpServerTransport = new NettyHttpServerTransport(settings, networkService, bigArrays, threadPool);
|
||||
httpServerTransport = new Netty3HttpServerTransport(settings, networkService, bigArrays, threadPool);
|
||||
HttpRequest httpRequest = new TestHttpRequest();
|
||||
httpRequest.headers().add(HttpHeaders.Names.ORIGIN, originValue);
|
||||
httpRequest.headers().add(HttpHeaders.Names.HOST, host);
|
||||
WriteCapturingChannel writeCapturingChannel = new WriteCapturingChannel();
|
||||
NettyHttpRequest request = new NettyHttpRequest(httpRequest, writeCapturingChannel);
|
||||
Netty3HttpRequest request = new Netty3HttpRequest(httpRequest, writeCapturingChannel);
|
||||
|
||||
NettyHttpChannel channel = new NettyHttpChannel(httpServerTransport, request, null, randomBoolean(), threadPool.getThreadContext());
|
||||
Netty3HttpChannel channel =
|
||||
new Netty3HttpChannel(httpServerTransport, request, null, randomBoolean(), threadPool.getThreadContext());
|
||||
channel.sendResponse(new TestReponse());
|
||||
|
||||
// get the response
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.http.netty;
|
||||
package org.elasticsearch.http.netty3;
|
||||
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
|
@ -54,9 +54,9 @@ import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.HOST;
|
|||
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||
|
||||
/**
|
||||
* Tiny helper to send http requests over netty.
|
||||
* Tiny helper to send http requests over netty3.
|
||||
*/
|
||||
public class NettyHttpClient implements Closeable {
|
||||
public class Netty3HttpClient implements Closeable {
|
||||
|
||||
public static Collection<String> returnHttpResponseBodies(Collection<HttpResponse> responses) {
|
||||
List<String> list = new ArrayList<>(responses.size());
|
||||
|
@ -76,7 +76,7 @@ public class NettyHttpClient implements Closeable {
|
|||
|
||||
private final ClientBootstrap clientBootstrap;
|
||||
|
||||
public NettyHttpClient() {
|
||||
public Netty3HttpClient() {
|
||||
clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory());;
|
||||
}
|
||||
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http.netty;
|
||||
package org.elasticsearch.http.netty3;
|
||||
|
||||
import org.elasticsearch.common.network.NetworkUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -32,11 +32,11 @@ import java.util.List;
|
|||
|
||||
import static java.net.InetAddress.getByName;
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.elasticsearch.http.netty.NettyHttpServerTransport.resolvePublishPort;
|
||||
import static org.elasticsearch.http.netty3.Netty3HttpServerTransport.resolvePublishPort;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class NettyHttpPublishPortTests extends ESTestCase {
|
||||
public class Netty3HttpPublishPortTests extends ESTestCase {
|
||||
|
||||
public void testHttpPublishPort() throws Exception {
|
||||
int boundPort = randomIntBetween(9000, 9100);
|
|
@ -16,9 +16,9 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.http.netty;
|
||||
package org.elasticsearch.http.netty3;
|
||||
|
||||
import org.elasticsearch.ESNettyIntegTestCase;
|
||||
import org.elasticsearch.ESNetty3IntegTestCase;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -27,7 +27,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
|
@ -46,7 +45,7 @@ import static org.hamcrest.Matchers.hasSize;
|
|||
* a single node "cluster". We also force test infrastructure to use the node client instead of the transport client for the same reason.
|
||||
*/
|
||||
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numClientNodes = 0, numDataNodes = 1, transportClientRatio = 0)
|
||||
public class NettyHttpRequestSizeLimitIT extends ESNettyIntegTestCase {
|
||||
public class Netty3HttpRequestSizeLimitIT extends ESNetty3IntegTestCase {
|
||||
private static final ByteSizeValue LIMIT = new ByteSizeValue(2, ByteSizeUnit.KB);
|
||||
|
||||
@Override
|
||||
|
@ -82,7 +81,7 @@ public class NettyHttpRequestSizeLimitIT extends ESNettyIntegTestCase {
|
|||
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress
|
||||
().boundAddresses());
|
||||
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
try (Netty3HttpClient nettyHttpClient = new Netty3HttpClient()) {
|
||||
Collection<HttpResponse> singleResponse = nettyHttpClient.post(inetSocketTransportAddress.address(), requests[0]);
|
||||
assertThat(singleResponse, hasSize(1));
|
||||
assertAtLeastOnceExpectedStatus(singleResponse, HttpResponseStatus.OK);
|
||||
|
@ -107,7 +106,7 @@ public class NettyHttpRequestSizeLimitIT extends ESNettyIntegTestCase {
|
|||
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress
|
||||
().boundAddresses());
|
||||
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
try (Netty3HttpClient nettyHttpClient = new Netty3HttpClient()) {
|
||||
Collection<HttpResponse> responses = nettyHttpClient.put(inetSocketTransportAddress.address(), requestUris);
|
||||
assertThat(responses, hasSize(requestUris.length));
|
||||
assertAllInExpectedStatus(responses, HttpResponseStatus.OK);
|
|
@ -16,17 +16,16 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.http.netty;
|
||||
package org.elasticsearch.http.netty3;
|
||||
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.http.netty.NettyHttpServerTransport.HttpChannelPipelineFactory;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
|
||||
import org.elasticsearch.http.netty3.Netty3HttpServerTransport.HttpChannelPipelineFactory;
|
||||
import org.elasticsearch.http.netty3.pipelining.OrderedDownstreamChannelEvent;
|
||||
import org.elasticsearch.http.netty3.pipelining.OrderedUpstreamMessageEvent;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
|
@ -54,7 +53,7 @@ import java.util.List;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import static org.elasticsearch.http.netty.NettyHttpClient.returnHttpResponseBodies;
|
||||
import static org.elasticsearch.http.netty3.Netty3HttpClient.returnHttpResponseBodies;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
@ -65,11 +64,11 @@ import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
|||
/**
|
||||
* This test just tests, if he pipelining works in general with out any connection the elasticsearch handler
|
||||
*/
|
||||
public class NettyHttpServerPipeliningTests extends ESTestCase {
|
||||
public class Netty3HttpServerPipeliningTests extends ESTestCase {
|
||||
private NetworkService networkService;
|
||||
private ThreadPool threadPool;
|
||||
private MockBigArrays bigArrays;
|
||||
private CustomNettyHttpServerTransport httpServerTransport;
|
||||
private CustomNetty3HttpServerTransport httpServerTransport;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
|
@ -93,13 +92,13 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
|
|||
.put("http.pipelining", true)
|
||||
.put("http.port", "0")
|
||||
.build();
|
||||
httpServerTransport = new CustomNettyHttpServerTransport(settings);
|
||||
httpServerTransport = new CustomNetty3HttpServerTransport(settings);
|
||||
httpServerTransport.start();
|
||||
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress()
|
||||
.boundAddresses());
|
||||
|
||||
List<String> requests = Arrays.asList("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast");
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
try (Netty3HttpClient nettyHttpClient = new Netty3HttpClient()) {
|
||||
Collection<HttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{}));
|
||||
Collection<String> responseBodies = returnHttpResponseBodies(responses);
|
||||
assertThat(responseBodies, contains("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast"));
|
||||
|
@ -111,13 +110,13 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
|
|||
.put("http.pipelining", false)
|
||||
.put("http.port", "0")
|
||||
.build();
|
||||
httpServerTransport = new CustomNettyHttpServerTransport(settings);
|
||||
httpServerTransport = new CustomNetty3HttpServerTransport(settings);
|
||||
httpServerTransport.start();
|
||||
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress()
|
||||
.boundAddresses());
|
||||
|
||||
List<String> requests = Arrays.asList("/slow?sleep=1000", "/firstfast", "/secondfast", "/thirdfast", "/slow?sleep=500");
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
try (Netty3HttpClient nettyHttpClient = new Netty3HttpClient()) {
|
||||
Collection<HttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{}));
|
||||
List<String> responseBodies = new ArrayList<>(returnHttpResponseBodies(responses));
|
||||
// we cannot be sure about the order of the fast requests, but the slow ones should have to be last
|
||||
|
@ -127,20 +126,20 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
class CustomNettyHttpServerTransport extends NettyHttpServerTransport {
|
||||
class CustomNetty3HttpServerTransport extends Netty3HttpServerTransport {
|
||||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
public CustomNettyHttpServerTransport(Settings settings) {
|
||||
super(settings, NettyHttpServerPipeliningTests.this.networkService,
|
||||
NettyHttpServerPipeliningTests.this.bigArrays, NettyHttpServerPipeliningTests.this.threadPool
|
||||
public CustomNetty3HttpServerTransport(Settings settings) {
|
||||
super(settings, Netty3HttpServerPipeliningTests.this.networkService,
|
||||
Netty3HttpServerPipeliningTests.this.bigArrays, Netty3HttpServerPipeliningTests.this.threadPool
|
||||
);
|
||||
this.executorService = Executors.newFixedThreadPool(5);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
|
||||
return new CustomHttpChannelPipelineFactory(this, executorService, NettyHttpServerPipeliningTests.this.threadPool
|
||||
return new CustomHttpChannelPipelineFactory(this, executorService, Netty3HttpServerPipeliningTests.this.threadPool
|
||||
.getThreadContext());
|
||||
}
|
||||
|
||||
|
@ -155,7 +154,7 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
|
|||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
public CustomHttpChannelPipelineFactory(NettyHttpServerTransport transport, ExecutorService executorService,
|
||||
public CustomHttpChannelPipelineFactory(Netty3HttpServerTransport transport, ExecutorService executorService,
|
||||
ThreadContext threadContext) {
|
||||
super(transport, randomBoolean(), threadContext);
|
||||
this.executorService = executorService;
|
|
@ -17,13 +17,13 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http.netty;
|
||||
package org.elasticsearch.http.netty3;
|
||||
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.http.netty.cors.CorsConfig;
|
||||
import org.elasticsearch.http.netty3.cors.Netty3CorsConfig;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
|
@ -45,9 +45,9 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ENABLED;
|
|||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
/**
|
||||
* Tests for the {@link NettyHttpServerTransport} class.
|
||||
* Tests for the {@link Netty3HttpServerTransport} class.
|
||||
*/
|
||||
public class NettyHttpServerTransportTests extends ESTestCase {
|
||||
public class Netty3HttpServerTransportTests extends ESTestCase {
|
||||
private NetworkService networkService;
|
||||
private ThreadPool threadPool;
|
||||
private MockBigArrays bigArrays;
|
||||
|
@ -79,8 +79,8 @@ public class NettyHttpServerTransportTests extends ESTestCase {
|
|||
.put(SETTING_CORS_ALLOW_HEADERS.getKey(), Strings.collectionToCommaDelimitedString(headers))
|
||||
.put(SETTING_CORS_ALLOW_CREDENTIALS.getKey(), true)
|
||||
.build();
|
||||
final NettyHttpServerTransport transport = new NettyHttpServerTransport(settings, networkService, bigArrays, threadPool);
|
||||
final CorsConfig corsConfig = transport.getCorsConfig();
|
||||
final Netty3HttpServerTransport transport = new Netty3HttpServerTransport(settings, networkService, bigArrays, threadPool);
|
||||
final Netty3CorsConfig corsConfig = transport.getCorsConfig();
|
||||
assertThat(corsConfig.isAnyOriginSupported(), equalTo(true));
|
||||
assertThat(corsConfig.allowedRequestHeaders(), equalTo(headers));
|
||||
assertThat(corsConfig.allowedRequestMethods().stream().map(HttpMethod::getName).collect(Collectors.toSet()), equalTo(methods));
|
|
@ -16,15 +16,14 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.http.netty;
|
||||
package org.elasticsearch.http.netty3;
|
||||
|
||||
import org.elasticsearch.ESNettyIntegTestCase;
|
||||
import org.elasticsearch.ESNetty3IntegTestCase;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
|
@ -34,7 +33,7 @@ import java.util.Collection;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
||||
import static org.elasticsearch.http.netty.NettyHttpClient.returnOpaqueIds;
|
||||
import static org.elasticsearch.http.netty3.Netty3HttpClient.returnOpaqueIds;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
|
@ -42,7 +41,7 @@ import static org.hamcrest.Matchers.hasSize;
|
|||
*
|
||||
*/
|
||||
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
|
||||
public class NettyPipeliningDisabledIT extends ESNettyIntegTestCase {
|
||||
public class Netty3PipeliningDisabledIT extends ESNetty3IntegTestCase {
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
|
@ -60,7 +59,7 @@ public class NettyPipeliningDisabledIT extends ESNettyIntegTestCase {
|
|||
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
|
||||
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(boundAddresses);
|
||||
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
try (Netty3HttpClient nettyHttpClient = new Netty3HttpClient()) {
|
||||
Collection<HttpResponse> responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests);
|
||||
assertThat(responses, hasSize(requests.length));
|
||||
|
|
@ -16,38 +16,33 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.http.netty;
|
||||
package org.elasticsearch.http.netty3;
|
||||
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ExternalTestCluster;
|
||||
import org.elasticsearch.transport.NettyPlugin;
|
||||
import org.elasticsearch.transport.Netty3Plugin;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
import java.util.Locale;
|
||||
|
||||
import static org.elasticsearch.http.netty.NettyHttpClient.returnOpaqueIds;
|
||||
import static org.elasticsearch.http.netty3.Netty3HttpClient.returnOpaqueIds;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
|
||||
public class NettyPipeliningEnabledIT extends ESIntegTestCase {
|
||||
public class Netty3PipeliningEnabledIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
|
||||
return pluginList(NettyPlugin.class);
|
||||
return pluginList(Netty3Plugin.class);
|
||||
}
|
||||
|
||||
public void testThatNettyHttpServerSupportsPipelining() throws Exception {
|
||||
String[] requests = new String[]{"/", "/_nodes/stats", "/", "/_cluster/state", "/"};
|
||||
|
||||
InetSocketAddress inetSocketAddress = randomFrom(cluster().httpAddresses());
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
try (Netty3HttpClient nettyHttpClient = new Netty3HttpClient()) {
|
||||
Collection<HttpResponse> responses = nettyHttpClient.get(inetSocketAddress, requests);
|
||||
assertThat(responses, hasSize(5));
|
||||
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.http.netty.pipelining;
|
||||
package org.elasticsearch.http.netty3.pipelining;
|
||||
|
||||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
import org.elasticsearch.test.ESTestCase;
|
|
@ -29,7 +29,7 @@ import org.elasticsearch.common.util.MockBigArrays;
|
|||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
import org.elasticsearch.transport.netty3.Netty3Transport;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
|
@ -45,16 +45,16 @@ import static org.hamcrest.Matchers.is;
|
|||
* This test checks, if a HTTP look-alike request (starting with a HTTP method and a space)
|
||||
* actually returns text response instead of just dropping the connection
|
||||
*/
|
||||
public class NettySizeHeaderFrameDecoderTests extends ESTestCase {
|
||||
public class Netty3SizeHeaderFrameDecoderTests extends ESTestCase {
|
||||
|
||||
private final Settings settings = Settings.builder()
|
||||
.put("node.name", "NettySizeHeaderFrameDecoderTests")
|
||||
.put("node.name", "Netty3SizeHeaderFrameDecoderTests")
|
||||
.put(TransportSettings.BIND_HOST.getKey(), "127.0.0.1")
|
||||
.put(TransportSettings.PORT.getKey(), "0")
|
||||
.build();
|
||||
|
||||
private ThreadPool threadPool;
|
||||
private NettyTransport nettyTransport;
|
||||
private Netty3Transport nettyTransport;
|
||||
private int port;
|
||||
private InetAddress host;
|
||||
|
||||
|
@ -63,7 +63,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase {
|
|||
threadPool = new ThreadPool(settings);
|
||||
NetworkService networkService = new NetworkService(settings);
|
||||
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, new NamedWriteableRegistry(),
|
||||
nettyTransport = new Netty3Transport(settings, threadPool, networkService, bigArrays, new NamedWriteableRegistry(),
|
||||
new NoneCircuitBreakerService());
|
||||
nettyTransport.start();
|
||||
TransportService transportService = new TransportService(settings, nettyTransport, threadPool);
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase;
|
||||
|
@ -39,7 +39,7 @@ public class ChannelBufferBytesReferenceTests extends AbstractBytesReferenceTest
|
|||
assertEquals(ref.length(), length);
|
||||
BytesRef bytesRef = ref.toBytesRef();
|
||||
final ChannelBuffer channelBuffer = ChannelBuffers.wrappedBuffer(bytesRef.bytes, bytesRef.offset, bytesRef.length);
|
||||
return NettyUtils.toBytesReference(channelBuffer);
|
||||
return Netty3Utils.toBytesReference(channelBuffer);
|
||||
}
|
||||
|
||||
public void testSliceOnAdvancedBuffer() throws IOException {
|
||||
|
@ -51,7 +51,7 @@ public class ChannelBufferBytesReferenceTests extends AbstractBytesReferenceTest
|
|||
for (int i = 0; i < numBytesToRead; i++) {
|
||||
channelBuffer.readByte();
|
||||
}
|
||||
BytesReference other = NettyUtils.toBytesReference(channelBuffer);
|
||||
BytesReference other = Netty3Utils.toBytesReference(channelBuffer);
|
||||
BytesReference slice = bytesReference.slice(numBytesToRead, bytesReference.length() - numBytesToRead);
|
||||
assertEquals(other, slice);
|
||||
assertEquals(other.slice(3, 1), slice.slice(3, 1));
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -49,9 +49,7 @@ import static java.util.Collections.emptySet;
|
|||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class NettyScheduledPingTests extends ESTestCase {
|
||||
public class Netty3ScheduledPingTests extends ESTestCase {
|
||||
public void testScheduledPing() throws Exception {
|
||||
ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
||||
|
||||
|
@ -64,14 +62,14 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
|
||||
|
||||
NamedWriteableRegistry registryA = new NamedWriteableRegistry();
|
||||
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings),
|
||||
final Netty3Transport nettyA = new Netty3Transport(settings, threadPool, new NetworkService(settings),
|
||||
BigArrays.NON_RECYCLING_INSTANCE, registryA, circuitBreakerService);
|
||||
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool);
|
||||
serviceA.start();
|
||||
serviceA.acceptIncomingRequests();
|
||||
|
||||
NamedWriteableRegistry registryB = new NamedWriteableRegistry();
|
||||
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings),
|
||||
final Netty3Transport nettyB = new Netty3Transport(settings, threadPool, new NetworkService(settings),
|
||||
BigArrays.NON_RECYCLING_INSTANCE, registryB, circuitBreakerService);
|
||||
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool);
|
||||
|
|
@ -16,9 +16,9 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.elasticsearch.ESNettyIntegTestCase;
|
||||
import org.elasticsearch.ESNetty3IntegTestCase;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
|
@ -27,23 +27,17 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
import org.elasticsearch.transport.Transports;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
@ -55,11 +49,8 @@ import java.util.List;
|
|||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
|
||||
public class NettyTransportIT extends ESNettyIntegTestCase {
|
||||
public class Netty3TransportIT extends ESNetty3IntegTestCase {
|
||||
// static so we can use it in anonymous classes
|
||||
private static String channelProfileName = null;
|
||||
|
||||
|
@ -72,7 +63,7 @@ public class NettyTransportIT extends ESNettyIntegTestCase {
|
|||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
List<Class<? extends Plugin>> list = new ArrayList<>();
|
||||
list.add(ExceptionThrowingNettyTransport.TestPlugin.class);
|
||||
list.add(ExceptionThrowingNetty3Transport.TestPlugin.class);
|
||||
list.addAll(super.nodePlugins());
|
||||
return Collections.unmodifiableCollection(list);
|
||||
}
|
||||
|
@ -90,18 +81,22 @@ public class NettyTransportIT extends ESNettyIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public static final class ExceptionThrowingNettyTransport extends NettyTransport {
|
||||
public static final class ExceptionThrowingNetty3Transport extends Netty3Transport {
|
||||
|
||||
public static class TestPlugin extends Plugin {
|
||||
public void onModule(NetworkModule module) {
|
||||
module.registerTransport("exception-throwing", ExceptionThrowingNettyTransport.class);
|
||||
module.registerTransport("exception-throwing", ExceptionThrowingNetty3Transport.class);
|
||||
}
|
||||
}
|
||||
|
||||
@Inject
|
||||
public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
CircuitBreakerService circuitBreakerService) {
|
||||
public ExceptionThrowingNetty3Transport(
|
||||
Settings settings,
|
||||
ThreadPool threadPool,
|
||||
NetworkService networkService,
|
||||
BigArrays bigArrays,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
CircuitBreakerService circuitBreakerService) {
|
||||
super(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
|
||||
}
|
||||
|
|
@ -16,9 +16,9 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.elasticsearch.ESNettyIntegTestCase;
|
||||
import org.elasticsearch.ESNetty3IntegTestCase;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
|
@ -34,7 +34,7 @@ import org.elasticsearch.env.Environment;
|
|||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.test.junit.annotations.Network;
|
||||
import org.elasticsearch.transport.NettyPlugin;
|
||||
import org.elasticsearch.transport.Netty3Plugin;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.Locale;
|
||||
|
@ -48,7 +48,7 @@ import static org.hamcrest.Matchers.is;
|
|||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1, numClientNodes = 0)
|
||||
public class NettyTransportMultiPortIntegrationIT extends ESNettyIntegTestCase {
|
||||
public class Netty3TransportMultiPortIntegrationIT extends ESNetty3IntegTestCase {
|
||||
|
||||
private static int randomPort = -1;
|
||||
private static String randomPortRange;
|
||||
|
@ -72,10 +72,10 @@ public class NettyTransportMultiPortIntegrationIT extends ESNettyIntegTestCase {
|
|||
public void testThatTransportClientCanConnect() throws Exception {
|
||||
Settings settings = Settings.builder()
|
||||
.put("cluster.name", internalCluster().getClusterName())
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty")
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty3")
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
|
||||
.build();
|
||||
try (TransportClient transportClient = TransportClient.builder().addPlugin(NettyPlugin.class).settings(settings).build()) {
|
||||
try (TransportClient transportClient = TransportClient.builder().addPlugin(Netty3Plugin.class).settings(settings).build()) {
|
||||
transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), randomPort));
|
||||
ClusterHealthResponse response = transportClient.admin().cluster().prepareHealth().get();
|
||||
assertThat(response.getStatus(), is(ClusterHealthStatus.GREEN));
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
|
@ -36,7 +36,7 @@ import org.junit.Before;
|
|||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class NettyTransportMultiPortTests extends ESTestCase {
|
||||
public class Netty3TransportMultiPortTests extends ESTestCase {
|
||||
|
||||
private String host;
|
||||
|
||||
|
@ -135,7 +135,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
|
||||
private TcpTransport<?> startTransport(Settings settings, ThreadPool threadPool) {
|
||||
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||
TcpTransport<?> transport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays,
|
||||
TcpTransport<?> transport = new Netty3Transport(settings, threadPool, new NetworkService(settings), bigArrays,
|
||||
new NamedWriteableRegistry(), new NoneCircuitBreakerService());
|
||||
transport.start();
|
||||
|
|
@ -17,18 +17,16 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.elasticsearch.ESNettyIntegTestCase;
|
||||
import org.elasticsearch.ESNetty3IntegTestCase;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.network.NetworkUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.net.Inet4Address;
|
||||
|
@ -42,7 +40,7 @@ import static org.hamcrest.Matchers.instanceOf;
|
|||
* different ports on ipv4 and ipv6.
|
||||
*/
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||
public class NettyTransportPublishAddressIT extends ESNettyIntegTestCase {
|
||||
public class Netty3TransportPublishAddressIT extends ESNetty3IntegTestCase {
|
||||
|
||||
public void testDifferentPorts() throws Exception {
|
||||
if (!NetworkUtils.SUPPORTS_V6) {
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase;
|
||||
|
@ -32,13 +32,13 @@ import org.jboss.netty.buffer.CompositeChannelBuffer;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
public class NettyUtilsTests extends ESTestCase {
|
||||
public class Netty3UtilsTests extends ESTestCase {
|
||||
|
||||
private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
|
||||
private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false);
|
||||
|
||||
public void testToChannelBufferWithEmptyRef() throws IOException {
|
||||
ChannelBuffer channelBuffer = NettyUtils.toChannelBuffer(getRandomizedBytesReference(0));
|
||||
ChannelBuffer channelBuffer = Netty3Utils.toChannelBuffer(getRandomizedBytesReference(0));
|
||||
assertSame(ChannelBuffers.EMPTY_BUFFER, channelBuffer);
|
||||
}
|
||||
|
||||
|
@ -47,8 +47,8 @@ public class NettyUtilsTests extends ESTestCase {
|
|||
int sliceOffset = randomIntBetween(0, ref.length());
|
||||
int sliceLength = randomIntBetween(ref.length() - sliceOffset, ref.length() - sliceOffset);
|
||||
BytesReference slice = ref.slice(sliceOffset, sliceLength);
|
||||
ChannelBuffer channelBuffer = NettyUtils.toChannelBuffer(slice);
|
||||
BytesReference bytesReference = NettyUtils.toBytesReference(channelBuffer);
|
||||
ChannelBuffer channelBuffer = Netty3Utils.toChannelBuffer(slice);
|
||||
BytesReference bytesReference = Netty3Utils.toBytesReference(channelBuffer);
|
||||
assertArrayEquals(BytesReference.toBytes(slice), BytesReference.toBytes(bytesReference));
|
||||
}
|
||||
|
||||
|
@ -56,16 +56,16 @@ public class NettyUtilsTests extends ESTestCase {
|
|||
BytesReference ref = getRandomizedBytesReference(randomIntBetween(1, 3 * PAGE_SIZE));
|
||||
int sliceOffset = randomIntBetween(0, ref.length());
|
||||
int sliceLength = randomIntBetween(ref.length() - sliceOffset, ref.length() - sliceOffset);
|
||||
ChannelBuffer channelBuffer = NettyUtils.toChannelBuffer(ref);
|
||||
BytesReference bytesReference = NettyUtils.toBytesReference(channelBuffer);
|
||||
ChannelBuffer channelBuffer = Netty3Utils.toChannelBuffer(ref);
|
||||
BytesReference bytesReference = Netty3Utils.toBytesReference(channelBuffer);
|
||||
assertArrayEquals(BytesReference.toBytes(ref.slice(sliceOffset, sliceLength)),
|
||||
BytesReference.toBytes(bytesReference.slice(sliceOffset, sliceLength)));
|
||||
}
|
||||
|
||||
public void testToChannelBuffer() throws IOException {
|
||||
BytesReference ref = getRandomizedBytesReference(randomIntBetween(1, 3 * PAGE_SIZE));
|
||||
ChannelBuffer channelBuffer = NettyUtils.toChannelBuffer(ref);
|
||||
BytesReference bytesReference = NettyUtils.toBytesReference(channelBuffer);
|
||||
ChannelBuffer channelBuffer = Netty3Utils.toChannelBuffer(ref);
|
||||
BytesReference bytesReference = Netty3Utils.toBytesReference(channelBuffer);
|
||||
if (ref instanceof ChannelBufferBytesReference) {
|
||||
assertEquals(channelBuffer, ((ChannelBufferBytesReference) ref).toChannelBuffer());
|
||||
} else if (AbstractBytesReferenceTestCase.getNumPages(ref) > 1) { // we gather the buffers into a channel buffer
|
||||
|
@ -87,7 +87,7 @@ public class NettyUtilsTests extends ESTestCase {
|
|||
return new BytesArray(ref.toBytesRef());
|
||||
} else if (randomBoolean()) {
|
||||
BytesRef bytesRef = ref.toBytesRef();
|
||||
return NettyUtils.toBytesReference(ChannelBuffers.wrappedBuffer(bytesRef.bytes, bytesRef.offset,
|
||||
return Netty3Utils.toBytesReference(ChannelBuffers.wrappedBuffer(bytesRef.bytes, bytesRef.offset,
|
||||
bytesRef.length));
|
||||
} else {
|
||||
return ref;
|
|
@ -17,10 +17,9 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport.netty3;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
|
@ -42,13 +41,13 @@ import static java.util.Collections.emptyMap;
|
|||
import static java.util.Collections.emptySet;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
||||
public class SimpleNettyTransportTests extends AbstractSimpleTransportTestCase {
|
||||
public class SimpleNetty3TransportTests extends AbstractSimpleTransportTestCase {
|
||||
|
||||
public static MockTransportService nettyFromThreadPool(
|
||||
Settings settings,
|
||||
ThreadPool threadPool, final Version version) {
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
Transport transport = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE,
|
||||
Transport transport = new Netty3Transport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE,
|
||||
namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
||||
@Override
|
||||
protected Version getCurrentVersion() {
|
|
@ -10,4 +10,4 @@
|
|||
- do:
|
||||
nodes.info: {}
|
||||
|
||||
- match: { nodes.$master.modules.0.name: transport-netty }
|
||||
- match: { nodes.$master.modules.0.name: transport-netty3 }
|
|
@ -117,46 +117,46 @@ thirdPartyAudit.excludes = [
|
|||
'com.sun.jersey.spi.container.servlet.ServletContainer',
|
||||
'com.sun.jersey.spi.inject.Injectable',
|
||||
'com.sun.jersey.spi.inject.InjectableProvider',
|
||||
'io.netty.bootstrap.Bootstrap',
|
||||
'io.netty.bootstrap.ChannelFactory',
|
||||
'io.netty.bootstrap.ServerBootstrap',
|
||||
'io.netty.buffer.ByteBuf',
|
||||
'io.netty.buffer.Unpooled',
|
||||
'io.netty.channel.Channel',
|
||||
'io.netty.channel.ChannelFuture',
|
||||
'io.netty.channel.ChannelFutureListener',
|
||||
'io.netty.channel.ChannelHandler',
|
||||
'io.netty.channel.ChannelHandlerContext',
|
||||
'io.netty.channel.ChannelInboundHandlerAdapter',
|
||||
'io.netty.channel.ChannelInitializer',
|
||||
'io.netty.channel.ChannelPipeline',
|
||||
'io.netty.channel.EventLoopGroup',
|
||||
'io.netty.channel.SimpleChannelInboundHandler',
|
||||
'io.netty.channel.group.ChannelGroup',
|
||||
'io.netty.channel.group.ChannelGroupFuture',
|
||||
'io.netty.channel.group.DefaultChannelGroup',
|
||||
'io.netty.channel.nio.NioEventLoopGroup',
|
||||
'io.netty.channel.socket.SocketChannel',
|
||||
'io.netty.channel.socket.nio.NioServerSocketChannel',
|
||||
'io.netty.channel.socket.nio.NioSocketChannel',
|
||||
'io.netty.handler.codec.http.DefaultFullHttpRequest',
|
||||
'io.netty.handler.codec.http.DefaultFullHttpResponse',
|
||||
'io.netty.handler.codec.http.DefaultHttpResponse',
|
||||
'io.netty.handler.codec.http.HttpContent',
|
||||
'io.netty.handler.codec.http.HttpHeaders',
|
||||
'io.netty.handler.codec.http.HttpMethod',
|
||||
'io.netty.handler.codec.http.HttpRequest',
|
||||
'io.netty.handler.codec.http.HttpRequestDecoder',
|
||||
'io.netty.handler.codec.http.HttpRequestEncoder',
|
||||
'io.netty.handler.codec.http.HttpResponseEncoder',
|
||||
'io.netty.handler.codec.http.HttpResponseStatus',
|
||||
'io.netty.handler.codec.http.HttpVersion',
|
||||
'io.netty.handler.codec.http.QueryStringDecoder',
|
||||
'io.netty.handler.codec.string.StringEncoder',
|
||||
'io.netty.handler.ssl.SslHandler',
|
||||
'io.netty.handler.stream.ChunkedStream',
|
||||
'io.netty.handler.stream.ChunkedWriteHandler',
|
||||
'io.netty.util.concurrent.GlobalEventExecutor',
|
||||
'io.netty.bootstrap.Bootstrap',
|
||||
'io.netty.bootstrap.ChannelFactory',
|
||||
'io.netty.bootstrap.ServerBootstrap',
|
||||
'io.netty.buffer.ByteBuf',
|
||||
'io.netty.buffer.Unpooled',
|
||||
'io.netty.channel.Channel',
|
||||
'io.netty.channel.ChannelFuture',
|
||||
'io.netty.channel.ChannelFutureListener',
|
||||
'io.netty.channel.ChannelHandler',
|
||||
'io.netty.channel.ChannelHandlerContext',
|
||||
'io.netty.channel.ChannelInboundHandlerAdapter',
|
||||
'io.netty.channel.ChannelInitializer',
|
||||
'io.netty.channel.ChannelPipeline',
|
||||
'io.netty.channel.EventLoopGroup',
|
||||
'io.netty.channel.SimpleChannelInboundHandler',
|
||||
'io.netty.channel.group.ChannelGroup',
|
||||
'io.netty.channel.group.ChannelGroupFuture',
|
||||
'io.netty.channel.group.DefaultChannelGroup',
|
||||
'io.netty.channel.nio.NioEventLoopGroup',
|
||||
'io.netty.channel.socket.SocketChannel',
|
||||
'io.netty.channel.socket.nio.NioServerSocketChannel',
|
||||
'io.netty.channel.socket.nio.NioSocketChannel',
|
||||
'io.netty.handler.codec.http.DefaultFullHttpRequest',
|
||||
'io.netty.handler.codec.http.DefaultFullHttpResponse',
|
||||
'io.netty.handler.codec.http.DefaultHttpResponse',
|
||||
'io.netty.handler.codec.http.HttpContent',
|
||||
'io.netty.handler.codec.http.HttpHeaders',
|
||||
'io.netty.handler.codec.http.HttpMethod',
|
||||
'io.netty.handler.codec.http.HttpRequest',
|
||||
'io.netty.handler.codec.http.HttpRequestDecoder',
|
||||
'io.netty.handler.codec.http.HttpRequestEncoder',
|
||||
'io.netty.handler.codec.http.HttpResponseEncoder',
|
||||
'io.netty.handler.codec.http.HttpResponseStatus',
|
||||
'io.netty.handler.codec.http.HttpVersion',
|
||||
'io.netty.handler.codec.http.QueryStringDecoder',
|
||||
'io.netty.handler.codec.string.StringEncoder',
|
||||
'io.netty.handler.ssl.SslHandler',
|
||||
'io.netty.handler.stream.ChunkedStream',
|
||||
'io.netty.handler.stream.ChunkedWriteHandler',
|
||||
'io.netty.util.concurrent.GlobalEventExecutor',
|
||||
'javax.ws.rs.core.Context',
|
||||
'javax.ws.rs.core.MediaType',
|
||||
'javax.ws.rs.core.MultivaluedMap',
|
||||
|
|
|
@ -22,5 +22,5 @@ apply plugin: 'elasticsearch.rest-test'
|
|||
// TODO: this test works, but it isn't really a rest test...should we have another plugin for "non rest test that just needs N clusters?"
|
||||
|
||||
dependencies {
|
||||
testCompile project(path: ':modules:transport-netty', configuration: 'runtime') // randomly swapped in as a transport
|
||||
testCompile project(path: ':modules:transport-netty3', configuration: 'runtime') // randomly swapped in as a transport
|
||||
}
|
|
@ -31,12 +31,9 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.internal.InternalSettingsPreparer;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.transport.MockTcpTransportPlugin;
|
||||
import org.elasticsearch.transport.NettyPlugin;
|
||||
import org.elasticsearch.transport.Netty3Plugin;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -81,7 +78,7 @@ public abstract class ESSmokeClientTestCase extends LuceneTestCase {
|
|||
protected String index;
|
||||
|
||||
public static final class BogusPlugin extends Plugin {
|
||||
// se NettyPlugin.... this runs without the permission from the netty module so it will fail since reindex can't set the property
|
||||
// se Netty3Plugin.... this runs without the permission from the netty3 module so it will fail since reindex can't set the property
|
||||
// to make it still work we disable that check but need to register the setting first
|
||||
private static final Setting<Boolean> ASSERT_NETTY_BUGLEVEL = Setting.boolSetting("netty.assert.buglevel", true,
|
||||
Setting.Property.NodeScope);
|
||||
|
@ -98,8 +95,8 @@ public abstract class ESSmokeClientTestCase extends LuceneTestCase {
|
|||
.put("client.transport.ignore_cluster_name", true)
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir);
|
||||
if (random().nextBoolean()) {
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, NettyPlugin.NETTY_TRANSPORT_NAME);
|
||||
transportClientBuilder.addPlugin(NettyPlugin.class);
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME);
|
||||
transportClientBuilder.addPlugin(Netty3Plugin.class);
|
||||
transportClientBuilder.addPlugin(BogusPlugin.class);
|
||||
builder.put("netty.assert.buglevel", false); // see BogusPlugin
|
||||
} else {
|
||||
|
|
|
@ -20,5 +20,5 @@
|
|||
apply plugin: 'elasticsearch.rest-test'
|
||||
|
||||
dependencies {
|
||||
testCompile project(path: ':modules:transport-netty', configuration: 'runtime') // for http
|
||||
testCompile project(path: ':modules:transport-netty3', configuration: 'runtime') // for http
|
||||
}
|
|
@ -23,9 +23,8 @@ import org.elasticsearch.common.settings.Setting;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.transport.MockTcpTransport;
|
||||
import org.elasticsearch.transport.MockTcpTransportPlugin;
|
||||
import org.elasticsearch.transport.NettyPlugin;
|
||||
import org.elasticsearch.transport.Netty3Plugin;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -38,19 +37,19 @@ public abstract class HttpSmokeTestCase extends ESIntegTestCase {
|
|||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put("netty.assert.buglevel", false)
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, randomFrom(NettyPlugin.NETTY_TRANSPORT_NAME,
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, randomFrom(Netty3Plugin.NETTY_TRANSPORT_NAME,
|
||||
MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME))
|
||||
.put(NetworkModule.HTTP_ENABLED.getKey(), true).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(MockTcpTransportPlugin.class, NettyPlugin.class, BogusPlugin.class);
|
||||
return pluginList(MockTcpTransportPlugin.class, Netty3Plugin.class, BogusPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
|
||||
return pluginList(MockTcpTransportPlugin.class, NettyPlugin.class, BogusPlugin.class);
|
||||
return pluginList(MockTcpTransportPlugin.class, Netty3Plugin.class, BogusPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -58,7 +57,7 @@ public abstract class HttpSmokeTestCase extends ESIntegTestCase {
|
|||
return Settings.builder()
|
||||
.put(super.transportClientSettings())
|
||||
.put("netty.assert.buglevel", false)
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, randomFrom(NettyPlugin.NETTY_TRANSPORT_NAME,
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, randomFrom(Netty3Plugin.NETTY_TRANSPORT_NAME,
|
||||
MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME)).build();
|
||||
}
|
||||
|
||||
|
@ -69,7 +68,7 @@ public abstract class HttpSmokeTestCase extends ESIntegTestCase {
|
|||
|
||||
|
||||
public static final class BogusPlugin extends Plugin {
|
||||
// see NettyPlugin.... this runs without the permission from the netty module so it will fail since reindex can't set the property
|
||||
// see Netty3Plugin.... this runs without the permission from the netty3 module so it will fail since reindex can't set the property
|
||||
// to make it still work we disable that check but need to register the setting first
|
||||
private static final Setting<Boolean> ASSERT_NETTY_BUGLEVEL = Setting.boolSetting("netty.assert.buglevel", true,
|
||||
Setting.Property.NodeScope);
|
||||
|
|
|
@ -24,7 +24,7 @@ List projects = [
|
|||
'modules:lang-groovy',
|
||||
'modules:lang-mustache',
|
||||
'modules:lang-painless',
|
||||
'modules:transport-netty',
|
||||
'modules:transport-netty3',
|
||||
'modules:reindex',
|
||||
'modules:percolator',
|
||||
'plugins:analysis-icu',
|
||||
|
|
|
@ -242,7 +242,7 @@ public abstract class ESBackcompatTestCase extends ESIntegTestCase {
|
|||
|
||||
protected Settings commonNodeSettings(int nodeOrdinal) {
|
||||
Settings.Builder builder = Settings.builder().put(requiredSettings());
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty"); // run same transport / disco as external
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty3"); // run same transport / disco as external
|
||||
builder.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen");
|
||||
return builder.build();
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ final class ExternalNode implements Closeable {
|
|||
|
||||
public static final Settings REQUIRED_SETTINGS = Settings.builder()
|
||||
.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen")
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty").build(); // we need network mode for this
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty3").build(); // we need network mode for this
|
||||
|
||||
private final Path path;
|
||||
private final Random random;
|
||||
|
|
|
@ -34,7 +34,8 @@ import java.util.List;
|
|||
*/
|
||||
public final class Features {
|
||||
|
||||
private static final List<String> SUPPORTED = Arrays.asList("stash_in_path", "groovy_scripting", "headers", "embedded_stash_key", "yaml");
|
||||
private static final List<String> SUPPORTED =
|
||||
Arrays.asList("stash_in_path", "groovy_scripting", "headers", "embedded_stash_key", "yaml");
|
||||
|
||||
private Features() {
|
||||
|
||||
|
|
Loading…
Reference in New Issue