Introduce a low level protocol handshake (#22094)

Today we rely on the version that the API user passes in together with the DiscoveryNode. This commit introduces a low level handshake where nodes exchange their version to be used with the transport protocol that is executed every time a connection to a node is established. This, on the one hand allows to change the wire protocol based on the version we are talking to even without a full cluster restart. Today we would need to carry on a BWC layer across major versions but with a handshake we can rely on the fact that the latest version of the previous minor executes a handshake and uses the latest protocol version across all communication with the N+1 version nodes.

This change is yet fully backwards compatible, a followup PR will remove the BWC in 6.0 once this has been back-ported to the 5.x branch
This commit is contained in:
Simon Willnauer 2016-12-13 21:06:23 +01:00 committed by GitHub
parent ce86405394
commit 7a9b667e98
14 changed files with 449 additions and 129 deletions

View File

@ -44,16 +44,19 @@ public final class ConnectionProfile {
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE))), 1, null);
TransportRequestOptions.Type.STATE))), 1, null, null);
private final List<ConnectionTypeHandle> handles;
private final int numConnections;
private final TimeValue connectTimeout;
private final TimeValue handshakeTimeout;
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout) {
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout, TimeValue handshakeTimeout)
{
this.handles = handles;
this.numConnections = numConnections;
this.connectTimeout = connectTimeout;
this.handshakeTimeout = handshakeTimeout;
}
/**
@ -64,9 +67,10 @@ public final class ConnectionProfile {
private final Set<TransportRequestOptions.Type> addedTypes = EnumSet.noneOf(TransportRequestOptions.Type.class);
private int offset = 0;
private TimeValue connectTimeout;
private TimeValue handshakeTimeout;
/**
* Sets a connect connectTimeout for this connection profile
* Sets a connect timeout for this connection profile
*/
public void setConnectTimeout(TimeValue connectTimeout) {
if (connectTimeout.millis() < 0) {
@ -75,6 +79,16 @@ public final class ConnectionProfile {
this.connectTimeout = connectTimeout;
}
/**
* Sets a handshake timeout for this connection profile
*/
public void setHandshakeTimeout(TimeValue handshakeTimeout) {
if (handshakeTimeout.millis() < 0) {
throw new IllegalArgumentException("handshakeTimeout must be non-negative but was: " + handshakeTimeout);
}
this.handshakeTimeout = handshakeTimeout;
}
/**
* Adds a number of connections for one or more types. Each type can only be added once.
* @param numConnections the number of connections to use in the pool for the given connection types
@ -104,7 +118,7 @@ public final class ConnectionProfile {
if (types.isEmpty() == false) {
throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types);
}
return new ConnectionProfile(Collections.unmodifiableList(handles), offset, connectTimeout);
return new ConnectionProfile(Collections.unmodifiableList(handles), offset, connectTimeout, handshakeTimeout);
}
}
@ -116,6 +130,13 @@ public final class ConnectionProfile {
return connectTimeout;
}
/**
* Returns the handshake timeout or <code>null</code> if no explicit timeout is set on this profile.
*/
public TimeValue getHandshakeTimeout() {
return handshakeTimeout;
}
/**
* Returns the total number of connections for this profile
*/

View File

@ -20,6 +20,8 @@ package org.elasticsearch.transport;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
import com.carrotsearch.hppc.LongObjectHashMap;
import com.carrotsearch.hppc.LongObjectMap;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
@ -91,6 +93,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -145,9 +148,11 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE,
Setting.Property.NodeScope);
// test-setting only
static final Setting<Boolean> CONNECTION_HANDSHAKE = Setting.boolSetting("transport.tcp.handshake", true);
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
private static final int PING_DATA_SIZE = -1;
protected final boolean blockingClient;
private final CircuitBreakerService circuitBreakerService;
// package visibility for tests
@ -156,6 +161,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
protected final ThreadPool threadPool;
private final BigArrays bigArrays;
protected final NetworkService networkService;
private final boolean doHandshakes;
protected volatile TransportServiceAdapter transportServiceAdapter;
// node id to actual channel
@ -174,6 +180,11 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
private final String transportName;
protected final ConnectionProfile defaultConnectionProfile;
private final LongObjectMap<TransportResponseHandler<?>> pendingHandshakes = new LongObjectHashMap<>();
private final AtomicLong requestIdGenerator = new AtomicLong();
private final CounterMetric numHandshakes = new CounterMetric();
private static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake";
public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
@ -189,6 +200,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
this.transportName = transportName;
this.blockingClient = TCP_BLOCKING_CLIENT.get(settings);
defaultConnectionProfile = buildDefaultConnectionProfile(settings);
this.doHandshakes = CONNECTION_HANDSHAKE.get(settings);
}
static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
@ -224,17 +236,12 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
@Override
public void transportServiceAdapter(TransportServiceAdapter service) {
if (service.getRequestHandler(HANDSHAKE_ACTION_NAME) != null) {
throw new IllegalStateException(HANDSHAKE_ACTION_NAME + " is a reserved request handler and must not be registered");
}
this.transportServiceAdapter = service;
}
public Settings settings() {
return this.settings;
}
public boolean isCompressed() {
return compress;
}
public class ScheduledPing extends AbstractLifecycleRunnable {
/**
@ -312,23 +319,36 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
public final class NodeChannels implements Connection {
private final Map<TransportRequestOptions.Type, ConnectionProfile.ConnectionTypeHandle> typeMapping
= new EnumMap<>(TransportRequestOptions.Type.class);
private final Map<TransportRequestOptions.Type, ConnectionProfile.ConnectionTypeHandle> typeMapping;
private final Channel[] channels;
private final DiscoveryNode node;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Version version;
public NodeChannels(DiscoveryNode node, Channel[] channels, ConnectionProfile connectionProfile) {
this.node = node;
this.channels = channels;
assert channels.length == connectionProfile.getNumConnections() : "expected channels size to be == "
+ connectionProfile.getNumConnections() + " but was: [" + channels.length + "]";
typeMapping = new EnumMap<>(TransportRequestOptions.Type.class);
for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile.getHandles()) {
for (TransportRequestOptions.Type type : handle.getTypes())
typeMapping.put(type, handle);
}
version = node.getVersion();
}
NodeChannels(NodeChannels channels, Version handshakeVersion) {
this.node = channels.node;
this.channels = channels.channels;
this.typeMapping = channels.typeMapping;
this.version = handshakeVersion;
}
@Override
public Version getVersion() {
return version;
}
public boolean hasChannel(Channel channel) {
for (Channel channel1 : channels) {
@ -370,7 +390,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
throw new NodeNotConnectedException(node, "connection already closed");
}
Channel channel = channel(options.type());
sendRequestToChannel(this.node, channel, requestId, action, request, options);
sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte)0);
}
}
@ -408,6 +428,22 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
"failed to connect to [{}], cleaning dangling connections", node), e);
throw e;
}
if (doHandshakes) { // some tests need to disable this
Channel channel = nodeChannels.channel(TransportRequestOptions.Type.PING);
final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ?
defaultConnectionProfile.getConnectTimeout():
connectionProfile.getConnectTimeout();
final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ?
connectTimeout : connectionProfile.getHandshakeTimeout();
Version version = executeHandshake(node, channel, handshakeTimeout);
if (version != null) {
// this is a BWC layer, if we talk to a pre 5.2 node then the handshake is not supported
// this will go away in master once it's all ported to 5.2 but for now we keep this to make
// the backport straight forward
nodeChannels = new NodeChannels(nodeChannels, version);
}
}
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, nodeChannels);
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
@ -486,7 +522,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
@Override
public Connection getConnection(DiscoveryNode node) {
public NodeChannels getConnection(DiscoveryNode node) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels == null) {
throw new NodeNotConnectedException(node, "Node not connected");
@ -521,7 +557,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
protected Map<String, Settings> buildProfileSettings() {
// extract default profile first and create standard bootstrap
Map<String, Settings> profiles = TransportSettings.TRANSPORT_PROFILES_SETTING.get(settings()).getAsGroups(true);
Map<String, Settings> profiles = TransportSettings.TRANSPORT_PROFILES_SETTING.get(settings).getAsGroups(true);
if (!profiles.containsKey(TransportSettings.DEFAULT_PROFILE)) {
profiles = new HashMap<>(profiles);
profiles.put(TransportSettings.DEFAULT_PROFILE, Settings.EMPTY);
@ -894,14 +930,13 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
return compress && (!(request instanceof BytesTransportRequest));
}
protected void sendRequestToChannel(DiscoveryNode node, Channel targetChannel, final long requestId, final String action,
final TransportRequest request, TransportRequestOptions options) throws IOException,
private void sendRequestToChannel(DiscoveryNode node, final Channel targetChannel, final long requestId, final String action,
final TransportRequest request, TransportRequestOptions options, Version channelVersion,
byte status) throws IOException,
TransportException {
if (compress) {
options = TransportRequestOptions.builder(options).withCompress(true).build();
}
byte status = 0;
status = TransportStatus.setRequest(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
// we wrap this in a release once since if the onRequestSent callback throws an exception
@ -920,7 +955,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
// we pick the smallest of the 2, to support both backward and forward compatibility
// note, this is the only place we need to do this, since from here on, we use the serialized version
// as the version to use also when the node receiving this request will send the response with
Version version = Version.min(getCurrentVersion(), node.getVersion());
Version version = Version.min(getCurrentVersion(), channelVersion);
stream.setVersion(version);
threadPool.getThreadContext().writeTo(stream);
@ -995,10 +1030,14 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
*/
public void sendResponse(Version nodeVersion, Channel channel, final TransportResponse response, final long requestId,
final String action, TransportResponseOptions options) throws IOException {
sendResponse(nodeVersion, channel, response, requestId, action, options, (byte)0);
}
private void sendResponse(Version nodeVersion, Channel channel, final TransportResponse response, final long requestId,
final String action, TransportResponseOptions options, byte status) throws IOException {
if (compress) {
options = TransportResponseOptions.builder(options).withCompress(true).build();
}
byte status = 0;
status = TransportStatus.setResponse(status); // TODO share some code with sendRequest
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
// we wrap this in a release once since if the onRequestSent callback throws an exception
@ -1129,6 +1168,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
return false;
}
}
if (dataLen <= 0) {
throw new StreamCorruptedException("invalid data length: " + dataLen);
}
@ -1218,9 +1258,19 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
streamIn.setVersion(version);
threadPool.getThreadContext().readHeaders(streamIn);
if (TransportStatus.isRequest(status)) {
handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress);
handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress, status);
} else {
final TransportResponseHandler<?> handler = transportServiceAdapter.onResponseReceived(requestId);
final TransportResponseHandler<?> handler;
if (TransportStatus.isHandshake(status) && doHandshakes) {
handler = pendingHandshakes.remove(requestId);
} else {
TransportResponseHandler theHandler = transportServiceAdapter.onResponseReceived(requestId);
if (theHandler == null && TransportStatus.isError(status)) {
handler = pendingHandshakes.remove(requestId);
} else {
handler = theHandler;
}
}
// ignore if its null, the adapter logs it
if (handler != null) {
if (TransportStatus.isError(status)) {
@ -1297,29 +1347,35 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
});
}
protected String handleRequest(Channel channel, String profileName, final StreamInput stream, long requestId,
int messageLengthBytes, Version version, InetSocketAddress remoteAddress) throws IOException {
protected String handleRequest(Channel channel, String profileName, final StreamInput stream, long requestId, int messageLengthBytes,
Version version, InetSocketAddress remoteAddress, byte status) throws IOException {
final String action = stream.readString();
transportServiceAdapter.onRequestReceived(requestId, action);
TransportChannel transportChannel = null;
try {
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
if (reg == null) {
throw new ActionNotFoundTransportException(action);
}
if (reg.canTripCircuitBreaker()) {
getInFlightRequestBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
if (TransportStatus.isHandshake(status) && doHandshakes) {
final VersionHandshakeResponse response = new VersionHandshakeResponse(getCurrentVersion());
sendResponse(version, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY,
TransportStatus.setHandshake((byte)0));
} else {
getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes);
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
if (reg == null) {
throw new ActionNotFoundTransportException(action);
}
if (reg.canTripCircuitBreaker()) {
getInFlightRequestBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
} else {
getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes);
}
transportChannel = new TcpTransportChannel<>(this, channel, transportName, action, requestId, version, profileName,
messageLengthBytes);
final TransportRequest request = reg.newRequest();
request.remoteAddress(new TransportAddress(remoteAddress));
request.readFrom(stream);
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
validateRequest(stream, requestId, action);
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
}
transportChannel = new TcpTransportChannel<>(this, channel, transportName, action, requestId, version, profileName,
messageLengthBytes);
final TransportRequest request = reg.newRequest();
request.remoteAddress(new TransportAddress(remoteAddress));
request.readFrom(stream);
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
validateRequest(stream, requestId, action);
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
} catch (Exception e) {
// the circuit breaker tripped
if (transportChannel == null) {
@ -1384,4 +1440,119 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
}
}
private static final class VersionHandshakeResponse extends TransportResponse {
private Version version;
private VersionHandshakeResponse(Version version) {
this.version = version;
}
private VersionHandshakeResponse() {}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
version = Version.readVersion(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
assert version != null;
Version.writeVersion(version, out);
}
}
// pkg private for testing
final Version executeHandshake(DiscoveryNode node, Channel channel, TimeValue timeout) throws IOException, InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Version> versionRef = new AtomicReference<>();
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
AtomicBoolean handshakeNotSupported = new AtomicBoolean(false);
numHandshakes.inc();
final long requestId = newRequestId();
pendingHandshakes.put(requestId, new TransportResponseHandler<VersionHandshakeResponse>() {
@Override
public VersionHandshakeResponse newInstance() {
return new VersionHandshakeResponse();
}
@Override
public void handleResponse(VersionHandshakeResponse response) {
final boolean success = versionRef.compareAndSet(null, response.version);
assert success;
latch.countDown();
}
@Override
public void handleException(TransportException exp) {
Throwable cause = exp.getCause();
if (cause != null
&& cause instanceof ActionNotFoundTransportException
// this will happen if we talk to a node (pre 5.2) that doesn't haven a handshake handler
// we will just treat the node as a 5.0.0 node unless the discovery node that is used to connect has a higher version.
&& cause.getMessage().equals("No handler for action [internal:tcp/handshake]")) {
handshakeNotSupported.set(true);
} else {
final boolean success = exceptionRef.compareAndSet(null, exp);
assert success;
}
latch.countDown();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
boolean success = false;
try {
// for the request we use the minCompatVersion since we don't know what's the version of the node we talk to
// we also have no payload on the request but the response will contain the actual version of the node we talk
// to as the payload.
final Version minCompatVersion = getCurrentVersion().minimumCompatibilityVersion();
sendRequestToChannel(node, channel, requestId, HANDSHAKE_ACTION_NAME, TransportRequest.Empty.INSTANCE,
TransportRequestOptions.EMPTY, minCompatVersion, TransportStatus.setHandshake((byte)0));
if (latch.await(timeout.millis(), TimeUnit.MILLISECONDS) == false) {
throw new ConnectTransportException(node, "handshake_timeout[" + timeout + "]");
}
success = true;
if (handshakeNotSupported.get()) {
// this is a BWC layer, if we talk to a pre 5.2 node then the handshake is not supported
// this will go away in master once it's all ported to 5.2 but for now we keep this to make
// the backport straight forward
return null;
}
if (exceptionRef.get() != null) {
throw new IllegalStateException("handshake failed", exceptionRef.get());
} else {
Version version = versionRef.get();
if (getCurrentVersion().isCompatible(version) == false) {
throw new IllegalStateException("Received message from unsupported version: [" + version
+ "] minimal compatible version is: [" + getCurrentVersion().minimumCompatibilityVersion() + "]");
}
return version;
}
} finally {
final TransportResponseHandler<?> removedHandler = pendingHandshakes.remove(requestId);
// in the case of a timeout or an exception on the send part the handshake has not been removed yet.
// but the timeout is tricky since it's basically a race condition so we only assert on the success case.
assert success && removedHandler == null || success == false : "handler for requestId [" + requestId + "] is not been removed";
}
}
final int getNumPendingHandshakes() { // for testing
return pendingHandshakes.size();
}
final long getNumHandshakes() {
return numHandshakes.count(); // for testing
}
@Override
public long newRequestId() {
return requestIdGenerator.incrementAndGet();
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
@ -83,6 +84,11 @@ public interface Transport extends LifecycleComponent {
return new NoopCircuitBreaker("in-flight-noop");
}
/**
* Returns a new request ID to use when sending a message via {@link Connection#sendRequest(long, String,
* TransportRequest, TransportRequestOptions)}
*/
long newRequestId();
/**
* Returns a connection for the given node if the node is connected.
* Connections returned from this method must not be closed. The lifecylce of this connection is maintained by the Transport
@ -116,5 +122,12 @@ public interface Transport extends LifecycleComponent {
*/
void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws
IOException, TransportException;
/**
* Returns the version of the node this connection was established with.
*/
default Version getVersion() {
return getNode().getVersion();
}
}
}

View File

@ -86,8 +86,6 @@ public class TransportService extends AbstractLifecycleComponent {
final ConcurrentMapLong<RequestHolder> clientHandlers = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
private final AtomicLong requestIds = new AtomicLong();
final CopyOnWriteArrayList<TransportConnectionListener> connectionListeners = new CopyOnWriteArrayList<>();
private final TransportInterceptor interceptor;
@ -520,7 +518,7 @@ public class TransportService extends AbstractLifecycleComponent {
throw new IllegalStateException("can't send request to a null connection");
}
DiscoveryNode node = connection.getNode();
final long requestId = newRequestId();
final long requestId = transport.newRequestId();
final TimeoutHandler timeoutHandler;
try {
@ -643,10 +641,6 @@ public class TransportService extends AbstractLifecycleComponent {
return true;
}
private long newRequestId() {
return requestIds.getAndIncrement();
}
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
return transport.addressesFromString(address, perAddressLimit);
}

View File

@ -24,6 +24,7 @@ final class TransportStatus {
private static final byte STATUS_REQRES = 1 << 0;
private static final byte STATUS_ERROR = 1 << 1;
private static final byte STATUS_COMPRESS = 1 << 2;
private static final byte STATUS_HANDSHAKE = 1 << 3;
public static boolean isRequest(byte value) {
return (value & STATUS_REQRES) == 0;
@ -56,4 +57,15 @@ final class TransportStatus {
value |= STATUS_COMPRESS;
return value;
}
static boolean isHandshake(byte value) { // pkg private since it's only used internally
return (value & STATUS_HANDSHAKE) != 0;
}
static byte setHandshake(byte value) { // pkg private since it's only used internally
value |= STATUS_HANDSHAKE;
return value;
}
}

View File

@ -48,7 +48,9 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
abstract class FailAndRetryMockTransport<Response extends TransportResponse> implements Transport {
@ -63,6 +65,7 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
private final AtomicInteger failures = new AtomicInteger();
private final AtomicInteger successes = new AtomicInteger();
private final Set<DiscoveryNode> triedNodes = new CopyOnWriteArraySet<>();
private final AtomicLong requestId = new AtomicLong();
FailAndRetryMockTransport(Random random, ClusterName clusterName) {
this.random = new Random(random.nextLong());
@ -221,4 +224,9 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
public Map<String, BoundTransportAddress> profileBoundAddresses() {
return Collections.emptyMap();
}
@Override
public long newRequestId() {
return requestId.incrementAndGet();
}
}

View File

@ -52,6 +52,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.equalTo;
@ -170,7 +171,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
final class MockTransport implements Transport {
private final AtomicLong requestId = new AtomicLong();
Set<DiscoveryNode> connectedNodes = ConcurrentCollections.newConcurrentSet();
volatile boolean randomConnectionExceptions = false;
@ -249,6 +250,11 @@ public class NodeConnectionsServiceTests extends ESTestCase {
return null;
}
@Override
public long newRequestId() {
return requestId.incrementAndGet();
}
@Override
public Lifecycle.State lifecycleState() {
return null;

View File

@ -207,7 +207,7 @@ public class TCPTransportTests extends ESTestCase {
}
@Override
public Connection getConnection(DiscoveryNode node) {
public NodeChannels getConnection(DiscoveryNode node) {
return new NodeChannels(node, new Object[ConnectionProfile.LIGHT_PROFILE.getNumConnections()],
ConnectionProfile.LIGHT_PROFILE);
}

View File

@ -23,9 +23,9 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.TcpHeader;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.Transports;
@ -66,10 +66,10 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
final ByteBuf buffer = (ByteBuf) msg;
final int remainingMessageSize = buffer.getInt(buffer.readerIndex() - TcpHeader.MESSAGE_LENGTH_SIZE);
final int expectedReaderIndex = buffer.readerIndex() + remainingMessageSize;
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
try {
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
// buffer, or in the cumulation buffer, which is cleaned each time so it could be bigger than the actual size
// buffer, or in the cumulative buffer, which is cleaned each time so it could be bigger than the actual size
BytesReference reference = Netty4Utils.toBytesReference(buffer, remainingMessageSize);
transport.messageReceived(reference, ctx.channel(), profileName, remoteAddress, remainingMessageSize);
} finally {

View File

@ -111,9 +111,9 @@ public class Netty4TransportIT extends ESNetty4IntegTestCase {
protected String handleRequest(Channel channel, String profileName,
StreamInput stream, long requestId, int messageLengthBytes, Version version,
InetSocketAddress remoteAddress) throws IOException {
InetSocketAddress remoteAddress, byte status) throws IOException {
String action = super.handleRequest(channel, profileName, stream, requestId, messageLengthBytes, version,
remoteAddress);
remoteAddress, status);
channelProfileName = TransportSettings.DEFAULT_PROFILE;
return action;
}

View File

@ -51,6 +51,7 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.lucene.util.LuceneTestCase.rarely;
@ -75,6 +76,8 @@ public class CapturingTransport implements Transport {
private ConcurrentMap<Long, Tuple<DiscoveryNode, String>> requests = new ConcurrentHashMap<>();
private BlockingQueue<CapturedRequest> capturedRequests = ConcurrentCollections.newBlockingQueue();
private final AtomicLong requestId = new AtomicLong();
/** returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()} */
public CapturedRequest[] capturedRequests() {
@ -279,6 +282,10 @@ public class CapturingTransport implements Transport {
}
@Override
public long newRequestId() {
return requestId.incrementAndGet();
}
public Connection getConnection(DiscoveryNode node) {
try {
return openConnection(node, null);
@ -286,5 +293,4 @@ public class CapturingTransport implements Transport {
throw new UncheckedIOException(e);
}
}
}

View File

@ -500,6 +500,11 @@ public final class MockTransportService extends TransportService {
return transport.getLocalAddresses();
}
@Override
public long newRequestId() {
return transport.newRequestId();
}
@Override
public Connection getConnection(DiscoveryNode node) {
return new FilteredConnection(transport.getConnection(node)) {
@ -688,4 +693,12 @@ public final class MockTransportService extends TransportService {
connection.close();
}
}
public Transport getOriginalTransport() {
Transport transport = transport();
while (transport instanceof DelegateTransport) {
transport = ((DelegateTransport) transport).transport;
}
return transport;
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.transport;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListenerResponseHandler;
@ -117,7 +118,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
};
serviceA.addConnectionListener(waitForConnection);
serviceB.addConnectionListener(waitForConnection);
int numHandshakes = 1;
if (useLocalNode) {
logger.info("--> using local node optimization");
serviceA.setLocalNode(nodeA);
@ -126,36 +127,69 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
logger.info("--> actively connecting to local node");
serviceA.connectToNode(nodeA);
serviceB.connectToNode(nodeB);
assertNumHandshakes(numHandshakes, serviceA.getOriginalTransport());
assertNumHandshakes(numHandshakes, serviceB.getOriginalTransport());
numHandshakes++;
}
serviceA.connectToNode(nodeB);
serviceB.connectToNode(nodeA);
assertNumHandshakes(numHandshakes, serviceA.getOriginalTransport());
assertNumHandshakes(numHandshakes, serviceB.getOriginalTransport());
assertThat("failed to wait for all nodes to connect", latch.await(5, TimeUnit.SECONDS), equalTo(true));
serviceA.removeConnectionListener(waitForConnection);
serviceB.removeConnectionListener(waitForConnection);
}
private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) {
private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings,
Settings settings, boolean acceptRequests) {
MockTransportService service = build(
Settings.builder()
.put(settings)
.put(Node.NODE_NAME_SETTING.getKey(), name)
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
.build(),
version,
clusterSettings);
service.acceptIncomingRequests();
if (acceptRequests) {
service.acceptIncomingRequests();
}
return service;
}
private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) {
return buildService(name, version, clusterSettings, Settings.EMPTY, true);
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
serviceA.close();
serviceB.close();
terminate(threadPool);
try {
assertNoPendingHandshakes(serviceA.getOriginalTransport());
assertNoPendingHandshakes(serviceB.getOriginalTransport());
} finally {
IOUtils.close(serviceA, serviceB, () -> {
try {
terminate(threadPool);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
public void assertNumHandshakes(long expected, Transport transport) {
if (transport instanceof TcpTransport) {
assertEquals(expected, ((TcpTransport) transport).getNumHandshakes());
}
}
public void assertNoPendingHandshakes(Transport transport) {
if (transport instanceof TcpTransport) {
assertEquals(0, ((TcpTransport) transport).getNumPendingHandshakes());
}
}
public void testHelloWorld() {
@ -1426,56 +1460,52 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
public void testBlockingIncomingRequests() throws Exception {
TransportService service = build(
Settings.builder()
.put("name", "TS_TEST")
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
.build(),
version0,
null);
AtomicBoolean requestProcessed = new AtomicBoolean();
service.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
requestProcessed.set(true);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
try (TransportService service = buildService("TS_TEST", version0, null,
Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), false)) {
AtomicBoolean requestProcessed = new AtomicBoolean(false);
service.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
requestProcessed.set(true);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
});
DiscoveryNode node =
new DiscoveryNode("TS_TEST", "TS_TEST", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
serviceA.close();
serviceA = buildService("TS_A", version0, null,
Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), true);
serviceA.connectToNode(node);
CountDownLatch latch = new CountDownLatch(1);
serviceA.sendRequest(node, "action", new TestRequest(), new TransportResponseHandler<TestResponse>() {
@Override
public TestResponse newInstance() {
return new TestResponse();
}
@Override
public void handleResponse(TestResponse response) {
latch.countDown();
}
@Override
public void handleException(TransportException exp) {
latch.countDown();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
DiscoveryNode node =
new DiscoveryNode("TS_TEST", "TS_TEST", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
serviceA.connectToNode(node);
assertFalse(requestProcessed.get());
CountDownLatch latch = new CountDownLatch(1);
serviceA.sendRequest(node, "action", new TestRequest(), new TransportResponseHandler<TestResponse>() {
@Override
public TestResponse newInstance() {
return new TestResponse();
}
@Override
public void handleResponse(TestResponse response) {
latch.countDown();
}
@Override
public void handleException(TransportException exp) {
latch.countDown();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
assertFalse(requestProcessed.get());
service.acceptIncomingRequests();
assertBusy(() -> assertTrue(requestProcessed.get()));
latch.await();
service.close();
service.acceptIncomingRequests();
assertBusy(() -> assertTrue(requestProcessed.get()));
latch.await();
}
}
public static class TestRequest extends TransportRequest {
@ -1752,21 +1782,69 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE);
// connection with one connection and a large timeout -- should consume the one spot in the backlog queue
serviceA.connectToNode(first, builder.build());
builder.setConnectTimeout(TimeValue.timeValueMillis(1));
final ConnectionProfile profile = builder.build();
// now with the 1ms timeout we got and test that is it's applied
long startTime = System.nanoTime();
ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> {
serviceA.connectToNode(second, profile);
});
final long now = System.nanoTime();
final long timeTaken = TimeValue.nsecToMSec(now - startTime);
assertTrue("test didn't timeout quick enough, time taken: [" + timeTaken + "]",
timeTaken < TimeValue.timeValueSeconds(5).millis());
assertEquals(ex.getMessage(), "[][" + second.getAddress() + "] connect_timeout[1ms]");
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null,
Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), true)) {
service.connectToNode(first, builder.build());
builder.setConnectTimeout(TimeValue.timeValueMillis(1));
final ConnectionProfile profile = builder.build();
// now with the 1ms timeout we got and test that is it's applied
long startTime = System.nanoTime();
ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> service.connectToNode(second, profile));
final long now = System.nanoTime();
final long timeTaken = TimeValue.nsecToMSec(now - startTime);
assertTrue("test didn't timeout quick enough, time taken: [" + timeTaken + "]",
timeTaken < TimeValue.timeValueSeconds(5).millis());
assertEquals(ex.getMessage(), "[][" + second.getAddress() + "] connect_timeout[1ms]");
}
}
}
public void testTcpHandshake() throws IOException, InterruptedException {
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport();
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null,
Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), true)) {
// this acts like a node that doesn't have support for handshakes
DiscoveryNode node =
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
serviceA.connectToNode(node);
TcpTransport.NodeChannels connection = originalTransport.getConnection(node);
Version version = originalTransport.executeHandshake(node, connection.channel(TransportRequestOptions.Type.PING),
TimeValue.timeValueSeconds(10));
assertNull(version);
serviceA.disconnectFromNode(node);
}
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null)) {
DiscoveryNode node =
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
serviceA.connectToNode(node);
TcpTransport.NodeChannels connection = originalTransport.getConnection(node);
Version version = originalTransport.executeHandshake(node, connection.channel(TransportRequestOptions.Type.PING),
TimeValue.timeValueSeconds(10));
assertEquals(version, Version.CURRENT);
}
}
public void testTcpHandshakeTimeout() throws IOException {
try (ServerSocket socket = new ServerSocket()) {
socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0), 1);
socket.setReuseAddress(true);
DiscoveryNode dummy = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(),
socket.getLocalPort()), emptyMap(),
emptySet(), version0);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1,
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE);
builder.setHandshakeTimeout(TimeValue.timeValueMillis(1));
ConnectTransportException ex = expectThrows(ConnectTransportException.class,
() -> serviceA.connectToNode(dummy, builder.build()));
assertEquals("[][" + dummy.getAddress() +"] handshake_timeout[1ms]", ex.getMessage());
}
}
}

View File

@ -22,7 +22,6 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -70,7 +69,6 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
private final ExecutorService executor;
private final Version mockVersion;
@Inject
public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
@ -97,7 +95,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
protected MockChannel bind(final String name, InetSocketAddress address) throws IOException {
ServerSocket socket = new ServerSocket();
socket.bind(address);
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings()));
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings));
ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings);
if (tcpReceiveBufferSize.getBytes() > 0) {
socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt());
@ -212,7 +210,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
if (tcpReceiveBufferSize.getBytes() > 0) {
socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt());
}
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings()));
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings));
}
@Override