Factor out abstract TCPTransport* classes to reduce the netty footprint (#19096)
Today we have a ton of logic inside the NettyTransport* codebase. The footprint of the code that has a direct netty dependency is large and alternative implementations are pretty hard today since they need to know all about our proticol etc. This change moves most of the code into TCPTransport* baseclasses and moves all the protocol send code together. The base classes now contain the majority of the logic while NettyTransport* classes remain to implement the glue code, configuration and optimization.
This commit is contained in:
parent
0d7c11ea1d
commit
40ec639c89
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -496,7 +497,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
org.elasticsearch.index.shard.IndexShardStartedException::new, 23),
|
||||
SEARCH_CONTEXT_MISSING_EXCEPTION(org.elasticsearch.search.SearchContextMissingException.class,
|
||||
org.elasticsearch.search.SearchContextMissingException::new, 24),
|
||||
GENERAL_SCRIPT_EXCEPTION(org.elasticsearch.script.GeneralScriptException.class,
|
||||
GENERAL_SCRIPT_EXCEPTION(org.elasticsearch.script.GeneralScriptException.class,
|
||||
org.elasticsearch.script.GeneralScriptException::new, 25),
|
||||
BATCH_OPERATION_EXCEPTION(org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException.class,
|
||||
org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException::new, 26),
|
||||
|
@ -676,8 +677,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
org.elasticsearch.indices.IndexAlreadyExistsException::new, 123),
|
||||
SCRIPT_PARSE_EXCEPTION(org.elasticsearch.script.Script.ScriptParseException.class,
|
||||
org.elasticsearch.script.Script.ScriptParseException::new, 124),
|
||||
HTTP_ON_TRANSPORT_EXCEPTION(org.elasticsearch.transport.netty.SizeHeaderFrameDecoder.HttpOnTransportException.class,
|
||||
org.elasticsearch.transport.netty.SizeHeaderFrameDecoder.HttpOnTransportException::new, 125),
|
||||
HTTP_ON_TRANSPORT_EXCEPTION(TcpTransport.HttpOnTransportException.class,
|
||||
TcpTransport.HttpOnTransportException::new, 125),
|
||||
MAPPER_PARSING_EXCEPTION(org.elasticsearch.index.mapper.MapperParsingException.class,
|
||||
org.elasticsearch.index.mapper.MapperParsingException::new, 126),
|
||||
SEARCH_CONTEXT_EXCEPTION(org.elasticsearch.search.SearchContextException.class,
|
||||
|
|
|
@ -50,8 +50,8 @@ import org.elasticsearch.plugins.PluginsService;
|
|||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.threadpool.ExecutorBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
|
@ -107,7 +107,7 @@ public class TransportClient extends AbstractClient {
|
|||
|
||||
private PluginsService newPluginService(final Settings settings) {
|
||||
final Settings.Builder settingsBuilder = Settings.builder()
|
||||
.put(NettyTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
|
||||
.put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
|
||||
.put(InternalSettingsPreparer.prepareSettings(settings))
|
||||
.put(NetworkService.NETWORK_SERVER.getKey(), false)
|
||||
.put(CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE);
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.common.io;
|
||||
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
|
@ -159,25 +158,6 @@ public final class Channels {
|
|||
return bytesRead;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Copies bytes from source {@link org.jboss.netty.buffer.ChannelBuffer} to a {@link java.nio.channels.GatheringByteChannel}
|
||||
*
|
||||
* @param source ChannelBuffer to copy from
|
||||
* @param sourceIndex index in <i>source</i> to start copying from
|
||||
* @param length how many bytes to copy
|
||||
* @param channel target GatheringByteChannel
|
||||
*/
|
||||
public static void writeToChannel(ChannelBuffer source, int sourceIndex, int length, GatheringByteChannel channel) throws IOException {
|
||||
while (length > 0) {
|
||||
int written = source.getBytes(sourceIndex, channel, length);
|
||||
sourceIndex += written;
|
||||
length -= written;
|
||||
}
|
||||
assert length == 0;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel}
|
||||
*
|
||||
|
|
|
@ -1,37 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.handler.codec.frame.FrameDecoder;
|
||||
|
||||
/**
|
||||
* A marker to not remove frame decoder from the resulting jar so plugins can use it.
|
||||
*/
|
||||
public class KeepFrameDecoder extends FrameDecoder {
|
||||
|
||||
public static final KeepFrameDecoder decoder = new KeepFrameDecoder();
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -1,42 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.netty;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelFutureListener;
|
||||
|
||||
/**
|
||||
* A channel listener that releases a {@link org.elasticsearch.common.lease.Releasable} when
|
||||
* the operation is complete.
|
||||
*/
|
||||
public class ReleaseChannelFutureListener implements ChannelFutureListener {
|
||||
|
||||
private final Releasable releasable;
|
||||
|
||||
public ReleaseChannelFutureListener(Releasable releasable) {
|
||||
this.releasable = releasable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
releasable.close();
|
||||
}
|
||||
}
|
|
@ -89,6 +89,7 @@ import org.elasticsearch.script.ScriptService;
|
|||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
|
@ -279,14 +280,14 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
TransportSettings.PUBLISH_PORT,
|
||||
TransportSettings.PORT,
|
||||
NettyTransport.WORKER_COUNT,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_RECOVERY,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_BULK,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_REG,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_STATE,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_PING,
|
||||
NettyTransport.PING_SCHEDULE,
|
||||
NettyTransport.TCP_BLOCKING_CLIENT,
|
||||
NettyTransport.TCP_CONNECT_TIMEOUT,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_RECOVERY,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_BULK,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_REG,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_STATE,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_PING,
|
||||
TcpTransport.PING_SCHEDULE,
|
||||
TcpTransport.TCP_BLOCKING_CLIENT,
|
||||
TcpTransport.TCP_CONNECT_TIMEOUT,
|
||||
NettyTransport.NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
|
||||
NettyTransport.NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
|
||||
NettyTransport.NETTY_RECEIVE_PREDICTOR_SIZE,
|
||||
|
@ -294,12 +295,12 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
NettyTransport.NETTY_RECEIVE_PREDICTOR_MAX,
|
||||
NetworkService.NETWORK_SERVER,
|
||||
NettyTransport.NETTY_BOSS_COUNT,
|
||||
NettyTransport.TCP_NO_DELAY,
|
||||
NettyTransport.TCP_KEEP_ALIVE,
|
||||
NettyTransport.TCP_REUSE_ADDRESS,
|
||||
NettyTransport.TCP_SEND_BUFFER_SIZE,
|
||||
NettyTransport.TCP_RECEIVE_BUFFER_SIZE,
|
||||
NettyTransport.TCP_BLOCKING_SERVER,
|
||||
TcpTransport.TCP_NO_DELAY,
|
||||
TcpTransport.TCP_KEEP_ALIVE,
|
||||
TcpTransport.TCP_REUSE_ADDRESS,
|
||||
TcpTransport.TCP_SEND_BUFFER_SIZE,
|
||||
TcpTransport.TCP_RECEIVE_BUFFER_SIZE,
|
||||
TcpTransport.TCP_BLOCKING_SERVER,
|
||||
NetworkService.GLOBAL_NETWORK_HOST_SETTING,
|
||||
NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING,
|
||||
NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING,
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.http.netty;
|
||||
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.buffer.CompositeChannelBuffer;
|
||||
|
|
|
@ -24,8 +24,7 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.elasticsearch.http.netty.cors.CorsHandler;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
|
||||
|
@ -128,7 +127,7 @@ public final class NettyHttpChannel extends AbstractRestChannel {
|
|||
}
|
||||
|
||||
if (content instanceof Releasable) {
|
||||
future.addListener(new ReleaseChannelFutureListener((Releasable) content));
|
||||
future.addListener((x) -> ((Releasable)content).close());
|
||||
addedReleaseListener = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.elasticsearch.http.netty;
|
|||
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.support.RestUtils;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
|
|
|
@ -24,8 +24,8 @@ import com.carrotsearch.hppc.IntSet;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.netty.OpenChannelsHandler;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.elasticsearch.transport.netty.OpenChannelsHandler;
|
||||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class TcpHeader {
|
||||
public static final int MARKER_BYTES_SIZE = 2 * 1;
|
||||
|
||||
public static final int MESSAGE_LENGTH_SIZE = 4;
|
||||
|
||||
public static final int REQUEST_ID_SIZE = 8;
|
||||
|
||||
public static final int STATUS_SIZE = 1;
|
||||
|
||||
public static final int VERSION_ID_SIZE = 4;
|
||||
|
||||
public static final int HEADER_SIZE = MARKER_BYTES_SIZE + MESSAGE_LENGTH_SIZE + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE;
|
||||
|
||||
public static void writeHeader(StreamOutput output, long requestId, byte status, Version version, int messageSize) throws IOException {
|
||||
output.writeByte((byte)'E');
|
||||
output.writeByte((byte)'S');
|
||||
// write the size, the size indicates the remaining message size, not including the size int
|
||||
output.writeInt(messageSize - TcpHeader.MARKER_BYTES_SIZE - TcpHeader.MESSAGE_LENGTH_SIZE);
|
||||
output.writeLong(requestId);
|
||||
output.writeByte(status);
|
||||
output.writeInt(version.id);
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public final class TcpTransportChannel<Channel> implements TransportChannel {
|
||||
private final TcpTransport<Channel> transport;
|
||||
protected final Version version;
|
||||
protected final String action;
|
||||
protected final long requestId;
|
||||
private final String profileName;
|
||||
private final long reservedBytes;
|
||||
private final AtomicBoolean released = new AtomicBoolean();
|
||||
private final String channelType;
|
||||
private final Channel channel;
|
||||
|
||||
public TcpTransportChannel(TcpTransport<Channel> transport, Channel channel, String channelType, String action,
|
||||
long requestId, Version version, String profileName, long reservedBytes) {
|
||||
this.version = version;
|
||||
this.channel = channel;
|
||||
this.transport = transport;
|
||||
this.action = action;
|
||||
this.requestId = requestId;
|
||||
this.profileName = profileName;
|
||||
this.reservedBytes = reservedBytes;
|
||||
this.channelType = channelType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String getProfileName() {
|
||||
return profileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String action() {
|
||||
return this.action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void sendResponse(TransportResponse response) throws IOException {
|
||||
sendResponse(response, TransportResponseOptions.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
|
||||
release();
|
||||
transport.sendResponse(version, channel, response, requestId, action, options);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(Throwable error) throws IOException {
|
||||
release();
|
||||
transport.sendErrorResponse(version, channel, error, requestId, action);
|
||||
}
|
||||
|
||||
private void release() {
|
||||
// attempt to release once atomically
|
||||
if (released.compareAndSet(false, true) == false) {
|
||||
throw new IllegalStateException("reserved bytes are already released");
|
||||
}
|
||||
transport.getInFlightRequestBreaker().addWithoutBreaking(-reservedBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final long getRequestId() {
|
||||
return requestId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String getChannelType() {
|
||||
return channelType;
|
||||
}
|
||||
|
||||
public Channel getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -20,6 +20,8 @@
|
|||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
|
@ -94,4 +96,9 @@ public interface Transport extends LifecycleComponent<Transport> {
|
|||
long serverOpen();
|
||||
|
||||
List<String> getLocalAddresses();
|
||||
|
||||
default CircuitBreaker getInFlightRequestBreaker() {
|
||||
return new NoopCircuitBreaker("in-flight-noop");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
|
@ -39,10 +38,10 @@ public enum Transports {
|
|||
final String threadName = t.getName();
|
||||
for (String s : Arrays.asList(
|
||||
LocalTransport.LOCAL_TRANSPORT_THREAD_NAME_PREFIX,
|
||||
NettyTransport.HTTP_SERVER_BOSS_THREAD_NAME_PREFIX,
|
||||
NettyTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
|
||||
NettyTransport.TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX,
|
||||
NettyTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX,
|
||||
TcpTransport.HTTP_SERVER_BOSS_THREAD_NAME_PREFIX,
|
||||
TcpTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
|
||||
TcpTransport.TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX,
|
||||
TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX,
|
||||
TEST_MOCK_TRANSPORT_THREAD_PREFIX)) {
|
||||
if (threadName.contains(s)) {
|
||||
return true;
|
||||
|
|
|
@ -16,13 +16,12 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.transport.netty.ChannelBufferStreamInputFactory;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -34,9 +33,12 @@ import java.nio.charset.StandardCharsets;
|
|||
final class ChannelBufferBytesReference implements BytesReference {
|
||||
|
||||
private final ChannelBuffer buffer;
|
||||
private final int size;
|
||||
|
||||
ChannelBufferBytesReference(ChannelBuffer buffer) {
|
||||
ChannelBufferBytesReference(ChannelBuffer buffer, int size) {
|
||||
this.buffer = buffer;
|
||||
this.size = size;
|
||||
assert size <= buffer.readableBytes() : "size[" + size +"] > " + buffer.readableBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -46,25 +48,24 @@ final class ChannelBufferBytesReference implements BytesReference {
|
|||
|
||||
@Override
|
||||
public int length() {
|
||||
return buffer.readableBytes();
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BytesReference slice(int from, int length) {
|
||||
return new ChannelBufferBytesReference(buffer.slice(buffer.readerIndex() + from, length));
|
||||
return new ChannelBufferBytesReference(buffer.slice(buffer.readerIndex() + from, length), length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamInput streamInput() {
|
||||
return ChannelBufferStreamInputFactory.create(buffer.duplicate());
|
||||
return new ChannelBufferStreamInput(buffer.duplicate(), size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(OutputStream os) throws IOException {
|
||||
buffer.getBytes(buffer.readerIndex(), os, length());
|
||||
buffer.getBytes(buffer.readerIndex(), os, size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] toBytes() {
|
||||
return copyBytesArray().toBytes();
|
||||
}
|
||||
|
@ -72,7 +73,7 @@ final class ChannelBufferBytesReference implements BytesReference {
|
|||
@Override
|
||||
public BytesArray toBytesArray() {
|
||||
if (buffer.hasArray()) {
|
||||
return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
|
||||
return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), size);
|
||||
}
|
||||
return copyBytesArray();
|
||||
}
|
||||
|
@ -111,7 +112,7 @@ final class ChannelBufferBytesReference implements BytesReference {
|
|||
@Override
|
||||
public BytesRef toBytesRef() {
|
||||
if (buffer.hasArray()) {
|
||||
return new BytesRef(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
|
||||
return new BytesRef(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), size);
|
||||
}
|
||||
byte[] copy = new byte[buffer.readableBytes()];
|
||||
buffer.getBytes(buffer.readerIndex(), copy);
|
||||
|
@ -120,7 +121,7 @@ final class ChannelBufferBytesReference implements BytesReference {
|
|||
|
||||
@Override
|
||||
public BytesRef copyBytesRef() {
|
||||
byte[] copy = new byte[buffer.readableBytes()];
|
||||
byte[] copy = new byte[size];
|
||||
buffer.getBytes(buffer.readerIndex(), copy);
|
||||
return new BytesRef(copy);
|
||||
}
|
|
@ -22,16 +22,14 @@ package org.elasticsearch.transport.netty;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A Netty {@link org.jboss.netty.buffer.ChannelBuffer} based {@link org.elasticsearch.common.io.stream.StreamInput}.
|
||||
*/
|
||||
public class ChannelBufferStreamInput extends StreamInput {
|
||||
class ChannelBufferStreamInput extends StreamInput {
|
||||
|
||||
private final ChannelBuffer buffer;
|
||||
private final int startIndex;
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ChannelBufferStreamInputFactory {
|
||||
|
||||
public static StreamInput create(ChannelBuffer buffer) {
|
||||
return new ChannelBufferStreamInput(buffer, buffer.readableBytes());
|
||||
}
|
||||
|
||||
public static StreamInput create(ChannelBuffer buffer, int size) {
|
||||
return new ChannelBufferStreamInput(buffer, size);
|
||||
}
|
||||
}
|
|
@ -1,437 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.compress.Compressor;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.compress.NotCompressedException;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ActionNotFoundTransportException;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
import org.elasticsearch.transport.RequestHandlerRegistry;
|
||||
import org.elasticsearch.transport.ResponseHandlerFailureTransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportSerializationException;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.Transports;
|
||||
import org.elasticsearch.transport.support.TransportStatus;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.channel.WriteCompletionEvent;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
|
||||
* to the relevant action.
|
||||
*/
|
||||
public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
protected final ESLogger logger;
|
||||
protected final ThreadPool threadPool;
|
||||
protected final TransportServiceAdapter transportServiceAdapter;
|
||||
protected final NettyTransport transport;
|
||||
protected final String profileName;
|
||||
private final ThreadContext threadContext;
|
||||
|
||||
public MessageChannelHandler(NettyTransport transport, ESLogger logger, String profileName) {
|
||||
this.threadPool = transport.threadPool();
|
||||
this.threadContext = threadPool.getThreadContext();
|
||||
this.transportServiceAdapter = transport.transportServiceAdapter();
|
||||
this.transport = transport;
|
||||
this.logger = logger;
|
||||
this.profileName = profileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
|
||||
transportServiceAdapter.sent(e.getWrittenAmount());
|
||||
super.writeComplete(ctx, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
||||
Transports.assertTransportThread();
|
||||
Object m = e.getMessage();
|
||||
if (!(m instanceof ChannelBuffer)) {
|
||||
ctx.sendUpstream(e);
|
||||
return;
|
||||
}
|
||||
ChannelBuffer buffer = (ChannelBuffer) m;
|
||||
Marker marker = new Marker(buffer);
|
||||
int size = marker.messageSizeWithRemainingHeaders();
|
||||
transportServiceAdapter.received(marker.messageSizeWithAllHeaders());
|
||||
|
||||
// we have additional bytes to read, outside of the header
|
||||
boolean hasMessageBytesToRead = marker.messageSize() != 0;
|
||||
|
||||
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
|
||||
// buffer, or in the cumulation buffer, which is cleaned each time
|
||||
StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
|
||||
boolean success = false;
|
||||
try (ThreadContext.StoredContext tCtx = threadContext.stashContext()) {
|
||||
long requestId = streamIn.readLong();
|
||||
byte status = streamIn.readByte();
|
||||
Version version = Version.fromId(streamIn.readInt());
|
||||
|
||||
if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
|
||||
Compressor compressor;
|
||||
try {
|
||||
compressor = CompressorFactory.compressor(NettyUtils.toBytesReference(buffer));
|
||||
} catch (NotCompressedException ex) {
|
||||
int maxToRead = Math.min(buffer.readableBytes(), 10);
|
||||
int offset = buffer.readerIndex();
|
||||
StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead)
|
||||
.append("] content bytes out of [").append(buffer.readableBytes())
|
||||
.append("] readable bytes with message size [").append(size).append("] ").append("] are [");
|
||||
for (int i = 0; i < maxToRead; i++) {
|
||||
sb.append(buffer.getByte(offset + i)).append(",");
|
||||
}
|
||||
sb.append("]");
|
||||
throw new IllegalStateException(sb.toString());
|
||||
}
|
||||
streamIn = compressor.streamInput(streamIn);
|
||||
}
|
||||
if (version.onOrAfter(Version.CURRENT.minimumCompatibilityVersion()) == false || version.major != Version.CURRENT.major) {
|
||||
throw new IllegalStateException("Received message from unsupported version: [" + version
|
||||
+ "] minimal compatible version is: [" +Version.CURRENT.minimumCompatibilityVersion() + "]");
|
||||
}
|
||||
streamIn.setVersion(version);
|
||||
if (TransportStatus.isRequest(status)) {
|
||||
threadContext.readHeaders(streamIn);
|
||||
handleRequest(ctx.getChannel(), marker, streamIn, requestId, size, version);
|
||||
} else {
|
||||
TransportResponseHandler<?> handler = transportServiceAdapter.onResponseReceived(requestId);
|
||||
// ignore if its null, the adapter logs it
|
||||
if (handler != null) {
|
||||
if (TransportStatus.isError(status)) {
|
||||
handlerResponseError(streamIn, handler);
|
||||
} else {
|
||||
handleResponse(ctx.getChannel(), streamIn, handler);
|
||||
}
|
||||
marker.validateResponse(streamIn, requestId, handler, TransportStatus.isError(status));
|
||||
}
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
try {
|
||||
if (success) {
|
||||
IOUtils.close(streamIn);
|
||||
} else {
|
||||
IOUtils.closeWhileHandlingException(streamIn);
|
||||
}
|
||||
} finally {
|
||||
// Set the expected position of the buffer, no matter what happened
|
||||
buffer.readerIndex(marker.expectedReaderIndex());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
|
||||
buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
|
||||
final TransportResponse response = handler.newInstance();
|
||||
response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
|
||||
response.remoteAddress();
|
||||
try {
|
||||
response.readFrom(buffer);
|
||||
} catch (Throwable e) {
|
||||
handleException(handler, new TransportSerializationException(
|
||||
"Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (ThreadPool.Names.SAME.equals(handler.executor())) {
|
||||
//noinspection unchecked
|
||||
handler.handleResponse(response);
|
||||
} else {
|
||||
threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
handleException(handler, new ResponseHandlerFailureTransportException(e));
|
||||
}
|
||||
}
|
||||
|
||||
private void handlerResponseError(StreamInput buffer, final TransportResponseHandler handler) {
|
||||
Throwable error;
|
||||
try {
|
||||
error = buffer.readThrowable();
|
||||
} catch (Throwable e) {
|
||||
error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
|
||||
}
|
||||
handleException(handler, error);
|
||||
}
|
||||
|
||||
private void handleException(final TransportResponseHandler handler, Throwable error) {
|
||||
if (!(error instanceof RemoteTransportException)) {
|
||||
error = new RemoteTransportException(error.getMessage(), error);
|
||||
}
|
||||
final RemoteTransportException rtx = (RemoteTransportException) error;
|
||||
if (ThreadPool.Names.SAME.equals(handler.executor())) {
|
||||
try {
|
||||
handler.handleException(rtx);
|
||||
} catch (Throwable e) {
|
||||
logger.error("failed to handle exception response [{}]", e, handler);
|
||||
}
|
||||
} else {
|
||||
threadPool.executor(handler.executor()).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
handler.handleException(rtx);
|
||||
} catch (Throwable e) {
|
||||
logger.error("failed to handle exception response [{}]", e, handler);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId, int messageLengthBytes,
|
||||
Version version) throws IOException {
|
||||
buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
|
||||
final String action = buffer.readString();
|
||||
transportServiceAdapter.onRequestReceived(requestId, action);
|
||||
NettyTransportChannel transportChannel = null;
|
||||
try {
|
||||
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
|
||||
if (reg == null) {
|
||||
throw new ActionNotFoundTransportException(action);
|
||||
}
|
||||
if (reg.canTripCircuitBreaker()) {
|
||||
transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
|
||||
} else {
|
||||
transport.inFlightRequestsBreaker().addWithoutBreaking(messageLengthBytes);
|
||||
}
|
||||
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
|
||||
requestId, version, profileName, messageLengthBytes);
|
||||
final TransportRequest request = reg.newRequest();
|
||||
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
|
||||
request.readFrom(buffer);
|
||||
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
|
||||
validateRequest(marker, buffer, requestId, action);
|
||||
if (ThreadPool.Names.SAME.equals(reg.getExecutor())) {
|
||||
//noinspection unchecked
|
||||
reg.processMessageReceived(request, transportChannel);
|
||||
} else {
|
||||
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
// the circuit breaker tripped
|
||||
if (transportChannel == null) {
|
||||
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
|
||||
requestId, version, profileName, 0);
|
||||
}
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
logger.warn("Failed to send error message back to client for action [{}]", e, action);
|
||||
logger.warn("Actual Exception", e1);
|
||||
}
|
||||
}
|
||||
return action;
|
||||
}
|
||||
|
||||
// This template method is needed to inject custom error checking logic in tests.
|
||||
protected void validateRequest(Marker marker, StreamInput buffer, long requestId, String action) throws IOException {
|
||||
marker.validateRequest(buffer, requestId, action);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
|
||||
transport.exceptionCaught(ctx, e);
|
||||
}
|
||||
|
||||
class ResponseHandler implements Runnable {
|
||||
|
||||
private final TransportResponseHandler handler;
|
||||
private final TransportResponse response;
|
||||
|
||||
public ResponseHandler(TransportResponseHandler handler, TransportResponse response) {
|
||||
this.handler = handler;
|
||||
this.response = response;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
handler.handleResponse(response);
|
||||
} catch (Throwable e) {
|
||||
handleException(handler, new ResponseHandlerFailureTransportException(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RequestHandler extends AbstractRunnable {
|
||||
private final RequestHandlerRegistry reg;
|
||||
private final TransportRequest request;
|
||||
private final NettyTransportChannel transportChannel;
|
||||
|
||||
public RequestHandler(RequestHandlerRegistry reg, TransportRequest request, NettyTransportChannel transportChannel) {
|
||||
this.reg = reg;
|
||||
this.request = request;
|
||||
this.transportChannel = transportChannel;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
reg.processMessageReceived(request, transportChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return reg.isForceExecution();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
|
||||
// we can only send a response transport is started....
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("Failed to send error message back to client for action [{}]", e1, reg.getAction());
|
||||
logger.warn("Actual Exception", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal helper class to store characteristic offsets of a buffer during processing
|
||||
*/
|
||||
protected static final class Marker {
|
||||
private final ChannelBuffer buffer;
|
||||
private final int remainingMessageSize;
|
||||
private final int expectedReaderIndex;
|
||||
|
||||
public Marker(ChannelBuffer buffer) {
|
||||
this.buffer = buffer;
|
||||
// when this constructor is called, we have read already two parts of the message header: the marker bytes and the message
|
||||
// message length (see SizeHeaderFrameDecoder). Hence we have to rewind the index for MESSAGE_LENGTH_SIZE bytes to read the
|
||||
// remaining message length again.
|
||||
this.remainingMessageSize = buffer.getInt(buffer.readerIndex() - NettyHeader.MESSAGE_LENGTH_SIZE);
|
||||
this.expectedReaderIndex = buffer.readerIndex() + remainingMessageSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of bytes that have yet to be read from the buffer
|
||||
*/
|
||||
public int messageSizeWithRemainingHeaders() {
|
||||
return remainingMessageSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number in bytes for the message including all headers (even the ones that have been read from the buffer already)
|
||||
*/
|
||||
public int messageSizeWithAllHeaders() {
|
||||
return remainingMessageSize + NettyHeader.MARKER_BYTES_SIZE + NettyHeader.MESSAGE_LENGTH_SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of bytes for the message itself (excluding all headers).
|
||||
*/
|
||||
public int messageSize() {
|
||||
return messageSizeWithAllHeaders() - NettyHeader.HEADER_SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the expected index of the buffer's reader after the message has been consumed entirely.
|
||||
*/
|
||||
public int expectedReaderIndex() {
|
||||
return expectedReaderIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that a request has been fully read (not too few bytes but also not too many bytes).
|
||||
*
|
||||
* @param stream A stream that is associated with the buffer that is tracked by this marker.
|
||||
* @param requestId The current request id.
|
||||
* @param action The currently executed action.
|
||||
* @throws IOException Iff the stream could not be read.
|
||||
* @throws IllegalStateException Iff the request has not been fully read.
|
||||
*/
|
||||
public void validateRequest(StreamInput stream, long requestId, String action) throws IOException {
|
||||
final int nextByte = stream.read();
|
||||
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
|
||||
if (nextByte != -1) {
|
||||
throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action
|
||||
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedReaderIndex + "]; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() < expectedReaderIndex) {
|
||||
throw new IllegalStateException("Message is fully read (request), yet there are "
|
||||
+ (expectedReaderIndex - buffer.readerIndex()) + " remaining bytes; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() > expectedReaderIndex) {
|
||||
throw new IllegalStateException(
|
||||
"Message read past expected size (request) for requestId [" + requestId + "], action [" + action
|
||||
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedReaderIndex + "]; resetting");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that a response has been fully read (not too few bytes but also not too many bytes).
|
||||
*
|
||||
* @param stream A stream that is associated with the buffer that is tracked by this marker.
|
||||
* @param requestId The corresponding request id for this response.
|
||||
* @param handler The current response handler.
|
||||
* @param error Whether validate an error response.
|
||||
* @throws IOException Iff the stream could not be read.
|
||||
* @throws IllegalStateException Iff the request has not been fully read.
|
||||
*/
|
||||
public void validateResponse(StreamInput stream, long requestId,
|
||||
TransportResponseHandler<?> handler, boolean error) throws IOException {
|
||||
// Check the entire message has been read
|
||||
final int nextByte = stream.read();
|
||||
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
|
||||
if (nextByte != -1) {
|
||||
throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
|
||||
+ handler + "], error [" + error + "]; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() < expectedReaderIndex) {
|
||||
throw new IllegalStateException("Message is fully read (response), yet there are "
|
||||
+ (expectedReaderIndex - buffer.readerIndex()) + " remaining bytes; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() > expectedReaderIndex) {
|
||||
throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId
|
||||
+ "], handler [" + handler + "], error [" + error + "]; resetting");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,76 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class NettyHeader {
|
||||
public static final int MARKER_BYTES_SIZE = 2 * 1;
|
||||
|
||||
public static final int MESSAGE_LENGTH_SIZE = 4;
|
||||
|
||||
public static final int REQUEST_ID_SIZE = 8;
|
||||
|
||||
public static final int STATUS_SIZE = 1;
|
||||
|
||||
public static final int VERSION_ID_SIZE = 4;
|
||||
|
||||
public static final int HEADER_SIZE = MARKER_BYTES_SIZE + MESSAGE_LENGTH_SIZE + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE;
|
||||
|
||||
/**
|
||||
* The magic number (must be lower than 0) for a ping message. This is handled
|
||||
* specifically in {@link org.elasticsearch.transport.netty.SizeHeaderFrameDecoder}.
|
||||
*/
|
||||
public static final int PING_DATA_SIZE = -1;
|
||||
private final static ChannelBuffer pingHeader;
|
||||
static {
|
||||
pingHeader = ChannelBuffers.buffer(6);
|
||||
pingHeader.writeByte('E');
|
||||
pingHeader.writeByte('S');
|
||||
pingHeader.writeInt(PING_DATA_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* A ping header is same as regular header, just with -1 for the size of the message.
|
||||
*/
|
||||
public static ChannelBuffer pingHeader() {
|
||||
return pingHeader.duplicate();
|
||||
}
|
||||
|
||||
public static void writeHeader(ChannelBuffer buffer, long requestId, byte status, Version version) {
|
||||
int index = buffer.readerIndex();
|
||||
buffer.setByte(index, 'E');
|
||||
index += 1;
|
||||
buffer.setByte(index, 'S');
|
||||
index += 1;
|
||||
// write the size, the size indicates the remaining message size, not including the size int
|
||||
buffer.setInt(index, buffer.readableBytes() - MARKER_BYTES_SIZE - MESSAGE_LENGTH_SIZE);
|
||||
index += MESSAGE_LENGTH_SIZE;
|
||||
buffer.setLong(index, requestId);
|
||||
index += REQUEST_ID_SIZE;
|
||||
buffer.setByte(index, status);
|
||||
index += STATUS_SIZE;
|
||||
buffer.setInt(index, version.id);
|
||||
}
|
||||
}
|
|
@ -27,11 +27,11 @@ import org.jboss.netty.logging.AbstractInternalLogger;
|
|||
*
|
||||
*/
|
||||
@SuppressLoggerChecks(reason = "safely delegates to logger")
|
||||
public class NettyInternalESLogger extends AbstractInternalLogger {
|
||||
final class NettyInternalESLogger extends AbstractInternalLogger {
|
||||
|
||||
private final ESLogger logger;
|
||||
|
||||
public NettyInternalESLogger(ESLogger logger) {
|
||||
NettyInternalESLogger(ESLogger logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class NettyInternalESLoggerFactory extends InternalLoggerFactory {
|
||||
|
||||
@Override
|
||||
public InternalLogger newInstance(String name) {
|
||||
return new NettyInternalESLogger(Loggers.getLogger(name));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.transport.TcpHeader;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TcpTransportChannel;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.Transports;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.channel.WriteCompletionEvent;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
|
||||
* to the relevant action.
|
||||
*/
|
||||
class NettyMessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
protected final TransportServiceAdapter transportServiceAdapter;
|
||||
protected final NettyTransport transport;
|
||||
protected final String profileName;
|
||||
|
||||
NettyMessageChannelHandler(NettyTransport transport, String profileName) {
|
||||
this.transportServiceAdapter = transport.transportServiceAdapter();
|
||||
this.transport = transport;
|
||||
this.profileName = profileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
|
||||
transportServiceAdapter.sent(e.getWrittenAmount());
|
||||
super.writeComplete(ctx, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
||||
Transports.assertTransportThread();
|
||||
Object m = e.getMessage();
|
||||
if (!(m instanceof ChannelBuffer)) {
|
||||
ctx.sendUpstream(e);
|
||||
return;
|
||||
}
|
||||
final ChannelBuffer buffer = (ChannelBuffer) m;
|
||||
final int remainingMessageSize = buffer.getInt(buffer.readerIndex() - TcpHeader.MESSAGE_LENGTH_SIZE);
|
||||
final int expectedReaderIndex = buffer.readerIndex() + remainingMessageSize;
|
||||
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.getChannel().getRemoteAddress();
|
||||
try {
|
||||
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
|
||||
// buffer, or in the cumulation buffer, which is cleaned each time so it could be bigger than the actual size
|
||||
BytesReference reference = NettyUtils.toBytesReference(buffer, remainingMessageSize);
|
||||
transport.messageReceived(reference, ctx.getChannel(), profileName, remoteAddress, remainingMessageSize);
|
||||
} finally {
|
||||
// Set the expected position of the buffer, no matter what happened
|
||||
buffer.readerIndex(expectedReaderIndex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
|
||||
transport.exceptionCaught(ctx, e);
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -1,176 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportResponseOptions;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.support.TransportStatus;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelFutureListener;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class NettyTransportChannel implements TransportChannel {
|
||||
|
||||
private final NettyTransport transport;
|
||||
private final TransportServiceAdapter transportServiceAdapter;
|
||||
private final Version version;
|
||||
private final String action;
|
||||
private final Channel channel;
|
||||
private final long requestId;
|
||||
private final String profileName;
|
||||
private final long reservedBytes;
|
||||
private final AtomicBoolean released = new AtomicBoolean();
|
||||
|
||||
public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter transportServiceAdapter, String action, Channel channel,
|
||||
long requestId, Version version, String profileName, long reservedBytes) {
|
||||
this.transportServiceAdapter = transportServiceAdapter;
|
||||
this.version = version;
|
||||
this.transport = transport;
|
||||
this.action = action;
|
||||
this.channel = channel;
|
||||
this.requestId = requestId;
|
||||
this.profileName = profileName;
|
||||
this.reservedBytes = reservedBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProfileName() {
|
||||
return profileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String action() {
|
||||
return this.action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(TransportResponse response) throws IOException {
|
||||
sendResponse(response, TransportResponseOptions.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
|
||||
release();
|
||||
if (transport.compress) {
|
||||
options = TransportResponseOptions.builder(options).withCompress(transport.compress).build();
|
||||
}
|
||||
|
||||
byte status = 0;
|
||||
status = TransportStatus.setResponse(status);
|
||||
|
||||
ReleasableBytesStreamOutput bStream = null;
|
||||
boolean addedReleaseListener = false;
|
||||
try {
|
||||
bStream = new ReleasableBytesStreamOutput(transport.bigArrays);
|
||||
bStream.skip(NettyHeader.HEADER_SIZE);
|
||||
StreamOutput stream = bStream;
|
||||
if (options.compress()) {
|
||||
status = TransportStatus.setCompress(status);
|
||||
stream = CompressorFactory.COMPRESSOR.streamOutput(stream);
|
||||
}
|
||||
stream.setVersion(version);
|
||||
response.writeTo(stream);
|
||||
stream.close();
|
||||
|
||||
ReleasablePagedBytesReference bytes = bStream.bytes();
|
||||
ChannelBuffer buffer = NettyUtils.toChannelBuffer(bytes);
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
ChannelFuture future = channel.write(buffer);
|
||||
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
|
||||
future.addListener(listener);
|
||||
addedReleaseListener = true;
|
||||
final TransportResponseOptions finalOptions = options;
|
||||
ChannelFutureListener onResponseSentListener =
|
||||
f -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions);
|
||||
future.addListener(onResponseSentListener);
|
||||
} finally {
|
||||
if (!addedReleaseListener && bStream != null) {
|
||||
Releasables.close(bStream.bytes());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(Throwable error) throws IOException {
|
||||
release();
|
||||
BytesStreamOutput stream = new BytesStreamOutput();
|
||||
stream.skip(NettyHeader.HEADER_SIZE);
|
||||
RemoteTransportException tx = new RemoteTransportException(
|
||||
transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
|
||||
stream.writeThrowable(tx);
|
||||
byte status = 0;
|
||||
status = TransportStatus.setResponse(status);
|
||||
status = TransportStatus.setError(status);
|
||||
|
||||
BytesReference bytes = stream.bytes();
|
||||
ChannelBuffer buffer = NettyUtils.toChannelBuffer(bytes);
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
ChannelFuture future = channel.write(buffer);
|
||||
ChannelFutureListener onResponseSentListener =
|
||||
f -> transportServiceAdapter.onResponseSent(requestId, action, error);
|
||||
future.addListener(onResponseSentListener);
|
||||
}
|
||||
|
||||
private void release() {
|
||||
// attempt to release once atomically
|
||||
if (released.compareAndSet(false, true) == false) {
|
||||
throw new IllegalStateException("reserved bytes are already released");
|
||||
}
|
||||
transport.inFlightRequestsBreaker().addWithoutBreaking(-reservedBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRequestId() {
|
||||
return requestId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getChannelType() {
|
||||
return "netty";
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the underlying netty channel. This method is intended be used for access to netty to get additional
|
||||
* details when processing the request and may be used by plugins. Responses should be sent using the methods
|
||||
* defined in this class and not directly on the channel.
|
||||
* @return underlying netty channel
|
||||
*/
|
||||
public Channel getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
}
|
|
@ -16,12 +16,12 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
|
@ -93,10 +93,11 @@ public class NettyUtils {
|
|||
}
|
||||
|
||||
static {
|
||||
InternalLoggerFactory.setDefaultFactory(new NettyInternalESLoggerFactory() {
|
||||
InternalLoggerFactory.setDefaultFactory(new InternalLoggerFactory() {
|
||||
@Override
|
||||
public InternalLogger newInstance(String name) {
|
||||
return super.newInstance(name.replace("org.jboss.netty.", "netty.").replace("org.jboss.netty.", "netty."));
|
||||
name = name.replace("org.jboss.netty.", "netty.").replace("org.jboss.netty.", "netty.");
|
||||
return new NettyInternalESLogger(Loggers.getLogger(name));
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -136,6 +137,13 @@ public class NettyUtils {
|
|||
* Wraps the given ChannelBuffer with a BytesReference
|
||||
*/
|
||||
public static BytesReference toBytesReference(ChannelBuffer channelBuffer) {
|
||||
return new ChannelBufferBytesReference(channelBuffer);
|
||||
return toBytesReference(channelBuffer, channelBuffer.readableBytes());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps the given ChannelBuffer with a BytesReference of a given size
|
||||
*/
|
||||
public static BytesReference toBytesReference(ChannelBuffer channelBuffer, int size) {
|
||||
return new ChannelBufferBytesReference(channelBuffer, size);
|
||||
}
|
||||
}
|
|
@ -17,8 +17,9 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
|
@ -32,13 +33,14 @@ import org.jboss.netty.channel.ChannelState;
|
|||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.ChannelUpstreamHandler;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@ChannelHandler.Sharable
|
||||
public class OpenChannelsHandler implements ChannelUpstreamHandler {
|
||||
public class OpenChannelsHandler implements ChannelUpstreamHandler, Releasable {
|
||||
|
||||
final Set<Channel> openChannels = ConcurrentCollections.newConcurrentSet();
|
||||
final CounterMetric openChannelsMetric = new CounterMetric();
|
||||
|
@ -91,6 +93,7 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler {
|
|||
return totalChannelsMetric.count();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
for (Channel channel : openChannels) {
|
||||
channel.close().awaitUninterruptibly();
|
|
@ -19,107 +19,29 @@
|
|||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.transport.TcpHeader;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.handler.codec.frame.FrameDecoder;
|
||||
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StreamCorruptedException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class SizeHeaderFrameDecoder extends FrameDecoder {
|
||||
|
||||
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().bytes() * 0.9);
|
||||
final class SizeHeaderFrameDecoder extends FrameDecoder {
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||
final int sizeHeaderLength = NettyHeader.MARKER_BYTES_SIZE + NettyHeader.MESSAGE_LENGTH_SIZE;
|
||||
if (buffer.readableBytes() < sizeHeaderLength) {
|
||||
try {
|
||||
boolean continueProcessing = TcpTransport.validateMessageHeader(NettyUtils.toBytesReference(buffer));
|
||||
buffer.skipBytes(TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE);
|
||||
return continueProcessing ? buffer : null;
|
||||
} catch (IllegalArgumentException ex) {
|
||||
throw new TooLongFrameException(ex.getMessage(), ex);
|
||||
} catch (IllegalStateException ex) {
|
||||
return null;
|
||||
}
|
||||
|
||||
int readerIndex = buffer.readerIndex();
|
||||
if (buffer.getByte(readerIndex) != 'E' || buffer.getByte(readerIndex + 1) != 'S') {
|
||||
// special handling for what is probably HTTP
|
||||
if (bufferStartsWith(buffer, readerIndex, "GET ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "POST ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "PUT ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "HEAD ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "DELETE ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "OPTIONS ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "PATCH ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "TRACE ")) {
|
||||
|
||||
throw new HttpOnTransportException("This is not a HTTP port");
|
||||
}
|
||||
|
||||
// we have 6 readable bytes, show 4 (should be enough)
|
||||
throw new StreamCorruptedException("invalid internal transport message format, got ("
|
||||
+ Integer.toHexString(buffer.getByte(readerIndex) & 0xFF) + ","
|
||||
+ Integer.toHexString(buffer.getByte(readerIndex + 1) & 0xFF) + ","
|
||||
+ Integer.toHexString(buffer.getByte(readerIndex + 2) & 0xFF) + ","
|
||||
+ Integer.toHexString(buffer.getByte(readerIndex + 3) & 0xFF) + ")");
|
||||
}
|
||||
|
||||
int dataLen = buffer.getInt(buffer.readerIndex() + NettyHeader.MARKER_BYTES_SIZE);
|
||||
if (dataLen == NettyHeader.PING_DATA_SIZE) {
|
||||
// discard the messages we read and continue, this is achieved by skipping the bytes
|
||||
// and returning null
|
||||
buffer.skipBytes(sizeHeaderLength);
|
||||
return null;
|
||||
}
|
||||
if (dataLen <= 0) {
|
||||
throw new StreamCorruptedException("invalid data length: " + dataLen);
|
||||
}
|
||||
// safety against too large frames being sent
|
||||
if (dataLen > NINETY_PER_HEAP_SIZE) {
|
||||
throw new TooLongFrameException("transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded ["
|
||||
+ new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() < dataLen + sizeHeaderLength) {
|
||||
return null;
|
||||
}
|
||||
buffer.skipBytes(sizeHeaderLength);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
private boolean bufferStartsWith(ChannelBuffer buffer, int readerIndex, String method) {
|
||||
char[] chars = method.toCharArray();
|
||||
for (int i = 0; i < chars.length; i++) {
|
||||
if (buffer.getByte(readerIndex + i) != chars[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper exception to mark an incoming connection as potentially being HTTP
|
||||
* so an appropriate error code can be returned
|
||||
*/
|
||||
public static class HttpOnTransportException extends ElasticsearchException {
|
||||
|
||||
public HttpOnTransportException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestStatus status() {
|
||||
return RestStatus.BAD_REQUEST;
|
||||
}
|
||||
|
||||
public HttpOnTransportException(StreamInput in) throws IOException{
|
||||
super(in);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,6 +80,7 @@ import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
|
|||
import org.elasticsearch.transport.ActionNotFoundTransportException;
|
||||
import org.elasticsearch.transport.ActionTransportException;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
|
@ -763,7 +764,7 @@ public class ExceptionSerializationTests extends ESTestCase {
|
|||
ids.put(122, null);
|
||||
ids.put(123, org.elasticsearch.indices.IndexAlreadyExistsException.class);
|
||||
ids.put(124, org.elasticsearch.script.Script.ScriptParseException.class);
|
||||
ids.put(125, org.elasticsearch.transport.netty.SizeHeaderFrameDecoder.HttpOnTransportException.class);
|
||||
ids.put(125, TcpTransport.HttpOnTransportException.class);
|
||||
ids.put(126, org.elasticsearch.index.mapper.MapperParsingException.class);
|
||||
ids.put(127, org.elasticsearch.search.SearchContextException.class);
|
||||
ids.put(128, org.elasticsearch.search.builder.SearchSourceBuilderException.class);
|
||||
|
|
|
@ -162,20 +162,6 @@ public class ChannelsTests extends ESTestCase {
|
|||
assertTrue("read bytes didn't match written bytes", sourceRef.equals(copyRef));
|
||||
}
|
||||
|
||||
|
||||
public void testWriteFromChannel() throws IOException {
|
||||
int length = randomIntBetween(1, randomBytes.length / 2);
|
||||
int offset = randomIntBetween(0, randomBytes.length - length);
|
||||
ByteBuffer byteBuffer = ByteBuffer.wrap(randomBytes);
|
||||
ChannelBuffer source = new ByteBufferBackedChannelBuffer(byteBuffer);
|
||||
Channels.writeToChannel(source, offset, length, fileChannel);
|
||||
|
||||
BytesReference copyRef = new BytesArray(Channels.readFromFileChannel(fileChannel, 0, length));
|
||||
BytesReference sourceRef = new BytesArray(randomBytes, offset, length);
|
||||
|
||||
assertTrue("read bytes didn't match written bytes", sourceRef.equals(copyRef));
|
||||
}
|
||||
|
||||
class MockFileChannel extends FileChannel {
|
||||
|
||||
FileChannel delegate;
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
|
@ -324,16 +324,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
|
||||
public void testVoidMessageCompressed() {
|
||||
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<TransportRequest.Empty>() {
|
||||
@Override
|
||||
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
|
||||
try {
|
||||
TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build();
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, responseOptions);
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected failure", e);
|
||||
fail(e.getMessage());
|
||||
}
|
||||
(request, channel) -> {
|
||||
try {
|
||||
TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build();
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, responseOptions);
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected failure", e);
|
||||
fail(e.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -17,13 +17,11 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.common.network.NetworkUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.transport.BindTransportException;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
@ -32,11 +30,11 @@ import java.util.List;
|
|||
|
||||
import static java.net.InetAddress.getByName;
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.elasticsearch.transport.netty.NettyTransport.resolvePublishPort;
|
||||
import static org.elasticsearch.transport.TcpTransport.resolvePublishPort;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class NettyPublishPortTests extends ESTestCase {
|
||||
public class PublishPortTests extends ESTestCase {
|
||||
|
||||
public void testPublishPort() throws Exception {
|
||||
int boundPort = randomIntBetween(9000, 9100);
|
|
@ -17,17 +17,17 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
/** Unit tests for NettyTransport */
|
||||
public class NettyTransportTests extends ESTestCase {
|
||||
|
||||
/** Unit tests for TCPTransport */
|
||||
public class TCPTransportTests extends ESTestCase {
|
||||
|
||||
/** Test ipv4 host with a default port works */
|
||||
public void testParseV4DefaultPort() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
|
@ -36,19 +36,19 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv4 host with a default port range works */
|
||||
public void testParseV4DefaultRange() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1", "1234-1235", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1", "1234-1235", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
assertEquals(1234, addresses[0].getPort());
|
||||
|
||||
|
||||
assertEquals("127.0.0.1", addresses[1].getAddress());
|
||||
assertEquals(1235, addresses[1].getPort());
|
||||
}
|
||||
|
||||
/** Test ipv4 host with port works */
|
||||
public void testParseV4WithPort() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1:2345", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1:2345", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
|
@ -57,7 +57,7 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv4 host with port range works */
|
||||
public void testParseV4WithPortRange() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1:2345-2346", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1:2345-2346", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
|
@ -70,7 +70,7 @@ public class NettyTransportTests extends ESTestCase {
|
|||
/** Test unbracketed ipv6 hosts in configuration fail. Leave no ambiguity */
|
||||
public void testParseV6UnBracketed() throws Exception {
|
||||
try {
|
||||
NettyTransport.parse("::1", "1234", Integer.MAX_VALUE);
|
||||
TcpTransport.parse("::1", "1234", Integer.MAX_VALUE);
|
||||
fail("should have gotten exception");
|
||||
} catch (IllegalArgumentException expected) {
|
||||
assertTrue(expected.getMessage().contains("must be bracketed"));
|
||||
|
@ -79,7 +79,7 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv6 host with a default port works */
|
||||
public void testParseV6DefaultPort() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
|
@ -88,19 +88,19 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv6 host with a default port range works */
|
||||
public void testParseV6DefaultRange() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]", "1234-1235", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]", "1234-1235", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
assertEquals(1234, addresses[0].getPort());
|
||||
|
||||
|
||||
assertEquals("::1", addresses[1].getAddress());
|
||||
assertEquals(1235, addresses[1].getPort());
|
||||
}
|
||||
|
||||
/** Test ipv6 host with port works */
|
||||
public void testParseV6WithPort() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]:2345", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]:2345", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
|
@ -109,7 +109,7 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv6 host with port range works */
|
||||
public void testParseV6WithPortRange() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]:2345-2346", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]:2345-2346", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
|
@ -118,10 +118,10 @@ public class NettyTransportTests extends ESTestCase {
|
|||
assertEquals("::1", addresses[1].getAddress());
|
||||
assertEquals(2346, addresses[1].getPort());
|
||||
}
|
||||
|
||||
|
||||
/** Test per-address limit */
|
||||
public void testAddressLimit() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]:100-200", "1000", 3);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]:100-200", "1000", 3);
|
||||
assertEquals(3, addresses.length);
|
||||
assertEquals(100, addresses[0].getPort());
|
||||
assertEquals(101, addresses[1].getPort());
|
|
@ -16,12 +16,13 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
|
|
@ -19,7 +19,6 @@
|
|||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
|
@ -33,6 +32,7 @@ import org.elasticsearch.test.transport.MockTransportService;
|
|||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BaseTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
|
@ -56,7 +56,7 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
||||
|
||||
Settings settings = Settings.builder()
|
||||
.put(NettyTransport.PING_SCHEDULE.getKey(), "5ms")
|
||||
.put(TcpTransport.PING_SCHEDULE.getKey(), "5ms")
|
||||
.put(TransportSettings.PORT.getKey(), 0)
|
||||
.put("cluster.name", "test")
|
||||
.build();
|
||||
|
@ -89,12 +89,12 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertThat(nettyA.scheduledPing.successfulPings.count(), greaterThan(100L));
|
||||
assertThat(nettyB.scheduledPing.successfulPings.count(), greaterThan(100L));
|
||||
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L));
|
||||
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L));
|
||||
}
|
||||
});
|
||||
assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0L));
|
||||
assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0L));
|
||||
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
|
||||
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
|
||||
|
||||
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<TransportRequest.Empty>() {
|
||||
|
@ -137,15 +137,12 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
}).txGet();
|
||||
}
|
||||
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertThat(nettyA.scheduledPing.successfulPings.count(), greaterThan(200L));
|
||||
assertThat(nettyB.scheduledPing.successfulPings.count(), greaterThan(200L));
|
||||
}
|
||||
assertBusy(() -> {
|
||||
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(200L));
|
||||
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(200L));
|
||||
});
|
||||
assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0L));
|
||||
assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0L));
|
||||
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
|
||||
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
|
||||
|
||||
Releasables.close(serviceA, serviceB);
|
||||
terminate(threadPool);
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.jboss.netty.channel.ChannelPipeline;
|
|||
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
|
@ -98,45 +99,24 @@ public class NettyTransportIT extends ESIntegTestCase {
|
|||
super(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
|
||||
}
|
||||
|
||||
protected String handleRequest(Channel channel, String profileName,
|
||||
StreamInput stream, long requestId, int messageLengthBytes, Version version,
|
||||
InetSocketAddress remoteAddress) throws IOException {
|
||||
String action = super.handleRequest(channel, profileName, stream, requestId, messageLengthBytes, version,
|
||||
remoteAddress);
|
||||
channelProfileName = TransportSettings.DEFAULT_PROFILE;
|
||||
return action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipelineFactory configureServerChannelPipelineFactory(String name, Settings groupSettings) {
|
||||
return new ErrorPipelineFactory(this, name, groupSettings);
|
||||
}
|
||||
|
||||
private static class ErrorPipelineFactory extends ServerChannelPipelineFactory {
|
||||
|
||||
private final ESLogger logger;
|
||||
|
||||
public ErrorPipelineFactory(ExceptionThrowingNettyTransport nettyTransport, String name, Settings groupSettings) {
|
||||
super(nettyTransport, name, groupSettings);
|
||||
this.logger = nettyTransport.logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
ChannelPipeline pipeline = super.getPipeline();
|
||||
pipeline.replace("dispatcher", "dispatcher",
|
||||
new MessageChannelHandler(nettyTransport, logger, TransportSettings.DEFAULT_PROFILE) {
|
||||
|
||||
@Override
|
||||
protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId,
|
||||
int messageLengthBytes, Version version) throws IOException {
|
||||
String action = super.handleRequest(channel, marker, buffer, requestId, messageLengthBytes, version);
|
||||
channelProfileName = this.profileName;
|
||||
return action;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void validateRequest(Marker marker, StreamInput buffer, long requestId, String action) throws IOException {
|
||||
super.validateRequest(marker, buffer, requestId, action);
|
||||
String error = threadPool.getThreadContext().getHeader("ERROR");
|
||||
if (error != null) {
|
||||
throw new ElasticsearchException(error);
|
||||
}
|
||||
}
|
||||
});
|
||||
return pipeline;
|
||||
protected void validateRequest(StreamInput buffer, long requestId, String action)
|
||||
throws IOException {
|
||||
super.validateRequest(buffer, requestId, action);
|
||||
String error = threadPool.getThreadContext().getHeader("ERROR");
|
||||
if (error != null) {
|
||||
throw new ElasticsearchException(error);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
*/
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
|
@ -30,6 +29,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
import org.junit.Before;
|
||||
|
@ -58,7 +58,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(1, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -74,7 +74,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(1, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -91,7 +91,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(0, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -107,7 +107,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(0, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -125,7 +125,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(0, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -133,14 +133,13 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private NettyTransport startNettyTransport(Settings settings, ThreadPool threadPool) {
|
||||
private TcpTransport<?> startTransport(Settings settings, ThreadPool threadPool) {
|
||||
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||
|
||||
NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays,
|
||||
TcpTransport<?> transport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays,
|
||||
new NamedWriteableRegistry(), new NoneCircuitBreakerService());
|
||||
nettyTransport.start();
|
||||
transport.start();
|
||||
|
||||
assertThat(nettyTransport.lifecycleState(), is(Lifecycle.State.STARTED));
|
||||
return nettyTransport;
|
||||
assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED));
|
||||
return transport;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -27,7 +27,6 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.buffer.CompositeChannelBuffer;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
@ -89,6 +89,7 @@ import org.elasticsearch.search.SearchService;
|
|||
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
||||
import org.elasticsearch.test.transport.AssertingLocalTransport;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
|
@ -421,9 +422,9 @@ public final class InternalTestCluster extends TestCluster {
|
|||
// randomize netty settings
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(NettyTransport.WORKER_COUNT.getKey(), random.nextInt(3) + 1);
|
||||
builder.put(NettyTransport.CONNECTIONS_PER_NODE_RECOVERY.getKey(), random.nextInt(2) + 1);
|
||||
builder.put(NettyTransport.CONNECTIONS_PER_NODE_BULK.getKey(), random.nextInt(3) + 1);
|
||||
builder.put(NettyTransport.CONNECTIONS_PER_NODE_REG.getKey(), random.nextInt(6) + 1);
|
||||
builder.put(TcpTransport.CONNECTIONS_PER_NODE_RECOVERY.getKey(), random.nextInt(2) + 1);
|
||||
builder.put(TcpTransport.CONNECTIONS_PER_NODE_BULK.getKey(), random.nextInt(3) + 1);
|
||||
builder.put(TcpTransport.CONNECTIONS_PER_NODE_REG.getKey(), random.nextInt(6) + 1);
|
||||
}
|
||||
|
||||
if (random.nextBoolean()) {
|
||||
|
@ -455,7 +456,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
}
|
||||
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(NettyTransport.PING_SCHEDULE.getKey(), RandomInts.randomIntBetween(random, 100, 2000) + "ms");
|
||||
builder.put(TcpTransport.PING_SCHEDULE.getKey(), RandomInts.randomIntBetween(random, 100, 2000) + "ms");
|
||||
}
|
||||
|
||||
if (random.nextBoolean()) {
|
||||
|
|
Loading…
Reference in New Issue