diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 8ff86e593c0..1af4f101e04 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -194,7 +194,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i protected final NetworkService networkService; protected final Set profileSettings; - protected volatile TransportServiceAdapter transportServiceAdapter; + protected volatile TransportService transportService; // node id to actual channel protected final ConcurrentMap connectedNodes = newConcurrentMap(); @@ -270,11 +270,11 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } @Override - public void transportServiceAdapter(TransportServiceAdapter service) { + public void setTransportService(TransportService service) { if (service.getRequestHandler(HANDSHAKE_ACTION_NAME) != null) { throw new IllegalStateException(HANDSHAKE_ACTION_NAME + " is a reserved request handler and must not be registered"); } - this.transportServiceAdapter = service; + this.transportService = service; } private static class HandshakeResponseHandler implements TransportResponseHandler { @@ -444,7 +444,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i try { closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false); } finally { - transportServiceAdapter.onConnectionClosed(this); + transportService.onConnectionClosed(this); } } } @@ -500,7 +500,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i logger.debug("connected to node [{}]", node); } try { - transportServiceAdapter.onNodeConnected(node); + transportService.onNodeConnected(node); } finally { if (nodeChannels.isClosed()) { // we got closed concurrently due to a disconnect or some other event on the channel. @@ -512,7 +512,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i // try to remove it first either way one of the two wins even if the callback has run before we even added the // tuple to the map since in that case we remove it here again if (connectedNodes.remove(node, nodeChannels)) { - transportServiceAdapter.onNodeDisconnected(node); + transportService.onNodeDisconnected(node); } throw new NodeNotConnectedException(node, "connection concurrently closed"); } @@ -597,7 +597,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i connectTimeout : connectionProfile.getHandshakeTimeout(); final Version version = executeHandshake(node, channel, handshakeTimeout); nodeChannels = new NodeChannels(nodeChannels, version); // clone the channels - we now have the correct version - transportServiceAdapter.onConnectionOpened(nodeChannels); + transportService.onConnectionOpened(nodeChannels); connectionRef.set(nodeChannels); success = true; return nodeChannels; @@ -625,7 +625,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i if (closeLock.readLock().tryLock()) { try { if (connectedNodes.remove(node, nodeChannels)) { - transportServiceAdapter.onNodeDisconnected(node); + transportService.onNodeDisconnected(node); } } finally { closeLock.readLock().unlock(); @@ -665,7 +665,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } finally { closeLock.readLock().unlock(); if (nodeChannels != null) { // if we found it and removed it we close and notify - IOUtils.closeWhileHandlingException(nodeChannels, () -> transportServiceAdapter.onNodeDisconnected(node)); + IOUtils.closeWhileHandlingException(nodeChannels, () -> transportService.onNodeDisconnected(node)); } } } @@ -916,7 +916,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i Map.Entry next = iterator.next(); try { IOUtils.closeWhileHandlingException(next.getValue()); - transportServiceAdapter.onNodeDisconnected(next.getKey()); + transportService.onNodeDisconnected(next.getKey()); } finally { iterator.remove(); } @@ -1078,7 +1078,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i final TransportRequestOptions finalOptions = options; // this might be called in a different thread SendListener onRequestSent = new SendListener(stream, - () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions), message.length()); + () -> transportService.onRequestSent(node, requestId, action, request, finalOptions), message.length()); internalSendMessage(targetChannel, message, onRequestSent); addedReleaseListener = true; } finally { @@ -1125,7 +1125,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length()); CompositeBytesReference message = new CompositeBytesReference(header, bytes); SendListener onResponseSent = new SendListener(null, - () -> transportServiceAdapter.onResponseSent(requestId, action, error), message.length()); + () -> transportService.onResponseSent(requestId, action, error), message.length()); internalSendMessage(channel, message, onResponseSent); } } @@ -1160,7 +1160,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i final TransportResponseOptions finalOptions = options; // this might be called in a different thread SendListener listener = new SendListener(stream, - () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions), message.length()); + () -> transportService.onResponseSent(requestId, action, response, finalOptions), message.length()); internalSendMessage(channel, message, listener); addedReleaseListener = true; } finally { @@ -1356,14 +1356,14 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i if (isHandshake) { handler = pendingHandshakes.remove(requestId); } else { - TransportResponseHandler theHandler = transportServiceAdapter.onResponseReceived(requestId); + TransportResponseHandler theHandler = transportService.onResponseReceived(requestId); if (theHandler == null && TransportStatus.isError(status)) { handler = pendingHandshakes.remove(requestId); } else { handler = theHandler; } } - // ignore if its null, the adapter logs it + // ignore if its null, the service logs it if (handler != null) { if (TransportStatus.isError(status)) { handlerResponseError(streamIn, handler); @@ -1456,7 +1456,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i protected String handleRequest(Channel channel, String profileName, final StreamInput stream, long requestId, int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status) throws IOException { final String action = stream.readString(); - transportServiceAdapter.onRequestReceived(requestId, action); + transportService.onRequestReceived(requestId, action); TransportChannel transportChannel = null; try { if (TransportStatus.isHandshake(status)) { @@ -1464,7 +1464,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i sendResponse(version, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY, TransportStatus.setHandshake((byte) 0)); } else { - final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action); + final RequestHandlerRegistry reg = transportService.getRequestHandler(action); if (reg == null) { throw new ActionNotFoundTransportException(action); } diff --git a/core/src/main/java/org/elasticsearch/transport/Transport.java b/core/src/main/java/org/elasticsearch/transport/Transport.java index 5d22e156d9d..b3471b942da 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transport.java +++ b/core/src/main/java/org/elasticsearch/transport/Transport.java @@ -40,7 +40,7 @@ public interface Transport extends LifecycleComponent { Setting TRANSPORT_TCP_COMPRESS = Setting.boolSetting("transport.tcp.compress", false, Property.NodeScope); - void transportServiceAdapter(TransportServiceAdapter service); + void setTransportService(TransportService service); /** * The address the transport is bound on. diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index a68e319bb2c..fa8278dea36 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -106,8 +105,6 @@ public class TransportService extends AbstractLifecycleComponent { } }); - private final TransportService.Adapter adapter; - public static final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() {}; // tracer log @@ -148,7 +145,7 @@ public class TransportService extends AbstractLifecycleComponent { * Build the service. * * @param clusterSettings if non null the the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings - * updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}. + * updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}. */ public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings) { @@ -160,7 +157,6 @@ public class TransportService extends AbstractLifecycleComponent { setTracerLogInclude(TRACE_LOG_INCLUDE_SETTING.get(settings)); setTracerLogExclude(TRACE_LOG_EXCLUDE_SETTING.get(settings)); tracerLog = Loggers.getLogger(logger, ".tracer"); - adapter = createAdapter(); taskManager = createTaskManager(); this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); @@ -187,10 +183,6 @@ public class TransportService extends AbstractLifecycleComponent { return taskManager; } - protected Adapter createAdapter() { - return new Adapter(); - } - protected TaskManager createTaskManager() { return new TaskManager(settings); } @@ -205,7 +197,7 @@ public class TransportService extends AbstractLifecycleComponent { @Override protected void doStart() { - transport.transportServiceAdapter(adapter); + transport.setTransportService(this); transport.start(); if (transport.boundAddress() != null && logger.isInfoEnabled()) { @@ -632,11 +624,11 @@ public class TransportService extends AbstractLifecycleComponent { } private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) { - final DirectResponseChannel channel = new DirectResponseChannel(logger, localNode, action, requestId, adapter, threadPool); + final DirectResponseChannel channel = new DirectResponseChannel(logger, localNode, action, requestId, this, threadPool); try { - adapter.onRequestSent(localNode, requestId, action, request, options); - adapter.onRequestReceived(requestId, action); - final RequestHandlerRegistry reg = adapter.getRequestHandler(action); + onRequestSent(localNode, requestId, action, request, options); + onRequestReceived(requestId, action); + final RequestHandlerRegistry reg = getRequestHandler(action); if (reg == null) { throw new ActionNotFoundTransportException("Action [" + action + "] not found"); } @@ -782,177 +774,171 @@ public class TransportService extends AbstractLifecycleComponent { } } - protected RequestHandlerRegistry getRequestHandler(String action) { + /** called by the {@link Transport} implementation once a request has been sent */ + void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, + TransportRequestOptions options) { + if (traceEnabled() && shouldTraceAction(action)) { + traceRequestSent(node, requestId, action, options); + } + } + + protected boolean traceEnabled() { + return tracerLog.isTraceEnabled(); + } + + /** called by the {@link Transport} implementation once a response was sent to calling node */ + void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions options) { + if (traceEnabled() && shouldTraceAction(action)) { + traceResponseSent(requestId, action); + } + } + + /** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */ + void onResponseSent(long requestId, String action, Exception e) { + if (traceEnabled() && shouldTraceAction(action)) { + traceResponseSent(requestId, action, e); + } + } + + protected void traceResponseSent(long requestId, String action, Exception e) { + tracerLog.trace( + (org.apache.logging.log4j.util.Supplier) + () -> new ParameterizedMessage("[{}][{}] sent error response", requestId, action), e); + } + + /** + * called by the {@link Transport} implementation when an incoming request arrives but before + * any parsing of it has happened (with the exception of the requestId and action) + */ + void onRequestReceived(long requestId, String action) { + try { + blockIncomingRequestsLatch.await(); + } catch (InterruptedException e) { + logger.trace("interrupted while waiting for incoming requests block to be removed"); + } + if (traceEnabled() && shouldTraceAction(action)) { + traceReceivedRequest(requestId, action); + } + } + + public RequestHandlerRegistry getRequestHandler(String action) { return requestHandlers.get(action); } - protected class Adapter implements TransportServiceAdapter { + /** + * called by the {@link Transport} implementation when a response or an exception has been received for a previously + * sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not + * found. + */ + public TransportResponseHandler onResponseReceived(final long requestId) { + RequestHolder holder = clientHandlers.remove(requestId); - @Override - public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, - TransportRequestOptions options) { - if (traceEnabled() && shouldTraceAction(action)) { - traceRequestSent(node, requestId, action, options); - } + if (holder == null) { + checkForTimeout(requestId); + return null; } - - protected boolean traceEnabled() { - return tracerLog.isTraceEnabled(); + holder.cancelTimeout(); + if (traceEnabled() && shouldTraceAction(holder.action())) { + traceReceivedResponse(requestId, holder.connection().getNode(), holder.action()); } + return holder.handler(); + } - @Override - public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions options) { - if (traceEnabled() && shouldTraceAction(action)) { - traceResponseSent(requestId, action); - } - } - - @Override - public void onResponseSent(long requestId, String action, Exception e) { - if (traceEnabled() && shouldTraceAction(action)) { - traceResponseSent(requestId, action, e); - } - } - - protected void traceResponseSent(long requestId, String action, Exception e) { - tracerLog.trace( - (org.apache.logging.log4j.util.Supplier) - () -> new ParameterizedMessage("[{}][{}] sent error response", requestId, action), e); - } - - @Override - public void onRequestReceived(long requestId, String action) { - try { - blockIncomingRequestsLatch.await(); - } catch (InterruptedException e) { - logger.trace("interrupted while waiting for incoming requests block to be removed"); - } - if (traceEnabled() && shouldTraceAction(action)) { - traceReceivedRequest(requestId, action); - } - } - - @Override - public RequestHandlerRegistry getRequestHandler(String action) { - return requestHandlers.get(action); - } - - @Override - public TransportResponseHandler onResponseReceived(final long requestId) { - RequestHolder holder = clientHandlers.remove(requestId); - - if (holder == null) { - checkForTimeout(requestId); - return null; - } - holder.cancelTimeout(); - if (traceEnabled() && shouldTraceAction(holder.action())) { - traceReceivedResponse(requestId, holder.connection().getNode(), holder.action()); - } - return holder.handler(); - } - - protected void checkForTimeout(long requestId) { - // lets see if its in the timeout holder, but sync on mutex to make sure any ongoing timeout handling has finished - final DiscoveryNode sourceNode; - final String action; - assert clientHandlers.get(requestId) == null; - TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId); - if (timeoutInfoHolder != null) { - long time = System.currentTimeMillis(); - logger.warn("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, " + + private void checkForTimeout(long requestId) { + // lets see if its in the timeout holder, but sync on mutex to make sure any ongoing timeout handling has finished + final DiscoveryNode sourceNode; + final String action; + assert clientHandlers.get(requestId) == null; + TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId); + if (timeoutInfoHolder != null) { + long time = System.currentTimeMillis(); + logger.warn("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, " + "action [{}], node [{}], id [{}]", time - timeoutInfoHolder.sentTime(), time - timeoutInfoHolder.timeoutTime(), - timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId); - action = timeoutInfoHolder.action(); - sourceNode = timeoutInfoHolder.node(); - } else { - logger.warn("Transport response handler not found of id [{}]", requestId); - action = null; - sourceNode = null; - } - // call tracer out of lock - if (traceEnabled() == false) { - return; - } - if (action == null) { - assert sourceNode == null; - traceUnresolvedResponse(requestId); - } else if (shouldTraceAction(action)) { - traceReceivedResponse(requestId, sourceNode, action); - } + timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId); + action = timeoutInfoHolder.action(); + sourceNode = timeoutInfoHolder.node(); + } else { + logger.warn("Transport response handler not found of id [{}]", requestId); + action = null; + sourceNode = null; } - - @Override - public void onNodeConnected(final DiscoveryNode node) { - // capture listeners before spawning the background callback so the following pattern won't trigger a call - // connectToNode(); connection is completed successfully - // addConnectionListener(); this listener shouldn't be called - final Stream listenersToNotify = TransportService.this.connectionListeners.stream(); - threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onNodeConnected(node))); + // call tracer out of lock + if (traceEnabled() == false) { + return; } - - @Override - public void onConnectionOpened(Transport.Connection connection) { - // capture listeners before spawning the background callback so the following pattern won't trigger a call - // connectToNode(); connection is completed successfully - // addConnectionListener(); this listener shouldn't be called - final Stream listenersToNotify = TransportService.this.connectionListeners.stream(); - threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(connection))); + if (action == null) { + assert sourceNode == null; + traceUnresolvedResponse(requestId); + } else if (shouldTraceAction(action)) { + traceReceivedResponse(requestId, sourceNode, action); } + } - @Override - public void onNodeDisconnected(final DiscoveryNode node) { - try { - threadPool.generic().execute( () -> { - for (final TransportConnectionListener connectionListener : connectionListeners) { - connectionListener.onNodeDisconnected(node); - } - }); - } catch (EsRejectedExecutionException ex) { - logger.debug("Rejected execution on NodeDisconnected", ex); - } + void onNodeConnected(final DiscoveryNode node) { + // capture listeners before spawning the background callback so the following pattern won't trigger a call + // connectToNode(); connection is completed successfully + // addConnectionListener(); this listener shouldn't be called + final Stream listenersToNotify = TransportService.this.connectionListeners.stream(); + threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onNodeConnected(node))); + } + + void onConnectionOpened(Transport.Connection connection) { + // capture listeners before spawning the background callback so the following pattern won't trigger a call + // connectToNode(); connection is completed successfully + // addConnectionListener(); this listener shouldn't be called + final Stream listenersToNotify = TransportService.this.connectionListeners.stream(); + threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(connection))); + } + + public void onNodeDisconnected(final DiscoveryNode node) { + try { + threadPool.generic().execute( () -> { + for (final TransportConnectionListener connectionListener : connectionListeners) { + connectionListener.onNodeDisconnected(node); + } + }); + } catch (EsRejectedExecutionException ex) { + logger.debug("Rejected execution on NodeDisconnected", ex); } + } - @Override - public void onConnectionClosed(Transport.Connection connection) { - try { - for (Map.Entry entry : clientHandlers.entrySet()) { - RequestHolder holder = entry.getValue(); - if (holder.connection().getCacheKey().equals(connection.getCacheKey())) { - final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey()); - if (holderToNotify != null) { - // callback that an exception happened, but on a different thread since we don't - // want handlers to worry about stack overflows - threadPool.generic().execute(() -> holderToNotify.handler().handleException(new NodeDisconnectedException( - connection.getNode(), holderToNotify.action()))); - } + void onConnectionClosed(Transport.Connection connection) { + try { + for (Map.Entry entry : clientHandlers.entrySet()) { + RequestHolder holder = entry.getValue(); + if (holder.connection().getCacheKey().equals(connection.getCacheKey())) { + final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey()); + if (holderToNotify != null) { + // callback that an exception happened, but on a different thread since we don't + // want handlers to worry about stack overflows + threadPool.generic().execute(() -> holderToNotify.handler().handleException(new NodeDisconnectedException( + connection.getNode(), holderToNotify.action()))); } } - } catch (EsRejectedExecutionException ex) { - logger.debug("Rejected execution on onConnectionClosed", ex); } + } catch (EsRejectedExecutionException ex) { + logger.debug("Rejected execution on onConnectionClosed", ex); } + } - protected void traceReceivedRequest(long requestId, String action) { - tracerLog.trace("[{}][{}] received request", requestId, action); - } + protected void traceReceivedRequest(long requestId, String action) { + tracerLog.trace("[{}][{}] received request", requestId, action); + } - protected void traceResponseSent(long requestId, String action) { - tracerLog.trace("[{}][{}] sent response", requestId, action); - } + protected void traceResponseSent(long requestId, String action) { + tracerLog.trace("[{}][{}] sent response", requestId, action); + } - protected void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) { - tracerLog.trace("[{}][{}] received response from [{}]", requestId, action, sourceNode); - } + protected void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) { + tracerLog.trace("[{}][{}] received response from [{}]", requestId, action, sourceNode); + } - protected void traceUnresolvedResponse(long requestId) { - tracerLog.trace("[{}] received response but can't resolve it to a request", requestId); - } - - protected void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { - tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout()); - } + protected void traceUnresolvedResponse(long requestId) { + tracerLog.trace("[{}] received response but can't resolve it to a request", requestId); + } + protected void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { + tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout()); } class TimeoutHandler implements Runnable { @@ -1118,16 +1104,16 @@ public class TransportService extends AbstractLifecycleComponent { final DiscoveryNode localNode; private final String action; private final long requestId; - final TransportServiceAdapter adapter; + final TransportService service; final ThreadPool threadPool; DirectResponseChannel(Logger logger, DiscoveryNode localNode, String action, long requestId, - TransportServiceAdapter adapter, ThreadPool threadPool) { + TransportService service, ThreadPool threadPool) { this.logger = logger; this.localNode = localNode; this.action = action; this.requestId = requestId; - this.adapter = adapter; + this.service = service; this.threadPool = threadPool; } @@ -1148,9 +1134,9 @@ public class TransportService extends AbstractLifecycleComponent { @Override public void sendResponse(final TransportResponse response, TransportResponseOptions options) throws IOException { - adapter.onResponseSent(requestId, action, response, options); - final TransportResponseHandler handler = adapter.onResponseReceived(requestId); - // ignore if its null, the adapter logs it + service.onResponseSent(requestId, action, response, options); + final TransportResponseHandler handler = service.onResponseReceived(requestId); + // ignore if its null, the service logs it if (handler != null) { final String executor = handler.executor(); if (ThreadPool.Names.SAME.equals(executor)) { @@ -1172,9 +1158,9 @@ public class TransportService extends AbstractLifecycleComponent { @Override public void sendResponse(Exception exception) throws IOException { - adapter.onResponseSent(requestId, action, exception); - final TransportResponseHandler handler = adapter.onResponseReceived(requestId); - // ignore if its null, the adapter logs it + service.onResponseSent(requestId, action, exception); + final TransportResponseHandler handler = service.onResponseReceived(requestId); + // ignore if its null, the service logs it if (handler != null) { final RemoteTransportException rtx = wrapInRemote(exception); final String executor = handler.executor(); diff --git a/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java b/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java deleted file mode 100644 index 24a71a99998..00000000000 --- a/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport; - -import org.elasticsearch.cluster.node.DiscoveryNode; - -public interface TransportServiceAdapter extends TransportConnectionListener { - - /** called by the {@link Transport} implementation once a request has been sent */ - void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options); - - /** called by the {@link Transport} implementation once a response was sent to calling node */ - void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions options); - - /** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */ - void onResponseSent(long requestId, String action, Exception e); - - /** - * called by the {@link Transport} implementation when a response or an exception has been received for a previously - * sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not - * found. - */ - TransportResponseHandler onResponseReceived(long requestId); - - /** - * called by the {@link Transport} implementation when an incoming request arrives but before - * any parsing of it has happened (with the exception of the requestId and action) - */ - void onRequestReceived(long requestId, String action); - - RequestHandlerRegistry getRequestHandler(String action); -} diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index de5c6690a34..8927fed567e 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -174,10 +174,10 @@ public abstract class TaskManagerTestCase extends ESTestCase { return discoveryNode.get(); }; transportService = new TransportService(settings, - new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), + new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), new NetworkService(Collections.emptyList())), - threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddressDiscoveryNodeFunction, null) { + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddressDiscoveryNodeFunction, null) { @Override protected TaskManager createTaskManager() { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 50fb348834f..5141b9cd471 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -49,7 +49,7 @@ public class TransportBulkActionTests extends ESTestCase { private TransportService transportService; private ClusterService clusterService; private ThreadPool threadPool; - + private TestTransportBulkAction bulkAction; class TestTransportBulkAction extends TransportBulkAction { @@ -132,4 +132,4 @@ public class TransportBulkActionTests extends ESTestCase { throw new AssertionError(exception); })); } -} \ No newline at end of file +} diff --git a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index dbe85898209..9be0d55d77e 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -40,7 +40,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; -import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportStats; import java.io.IOException; @@ -60,7 +60,7 @@ abstract class FailAndRetryMockTransport imp private boolean connectMode = true; - private TransportServiceAdapter transportServiceAdapter; + private TransportService transportService; private final AtomicInteger connectTransportExceptions = new AtomicInteger(); private final AtomicInteger failures = new AtomicInteger(); @@ -90,12 +90,12 @@ abstract class FailAndRetryMockTransport imp //we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info if (connectMode) { if (TransportLivenessAction.NAME.equals(action)) { - TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); + TransportResponseHandler transportResponseHandler = transportService.onResponseReceived(requestId); transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING. getDefault(Settings.EMPTY), node)); } else if (ClusterStateAction.NAME.equals(action)) { - TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); + TransportResponseHandler transportResponseHandler = transportService.onResponseReceived(requestId); ClusterState clusterState = getMockClusterState(node); transportResponseHandler.handleResponse(new ClusterStateResponse(clusterName, clusterState, 0L)); } else { @@ -116,7 +116,7 @@ abstract class FailAndRetryMockTransport imp //throw whatever exception that is not a subclass of ConnectTransportException throw new IllegalStateException(); } else { - TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); + TransportResponseHandler transportResponseHandler = transportService.onResponseReceived(requestId); if (random.nextBoolean()) { successes.incrementAndGet(); transportResponseHandler.handleResponse(newResponse()); @@ -163,8 +163,8 @@ abstract class FailAndRetryMockTransport imp } @Override - public void transportServiceAdapter(TransportServiceAdapter transportServiceAdapter) { - this.transportServiceAdapter = transportServiceAdapter; + public void setTransportService(TransportService transportServiceAdapter) { + this.transportService = transportServiceAdapter; } @Override diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index ad24da029e7..9ff6ae06d17 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -61,7 +61,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 2e7a857cc7b..51908a45380 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -40,7 +40,6 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.transport.TransportServiceAdapter; import org.elasticsearch.transport.TransportStats; import org.junit.After; import org.junit.Before; @@ -176,7 +175,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { volatile boolean randomConnectionExceptions = false; @Override - public void transportServiceAdapter(TransportServiceAdapter service) { + public void setTransportService(TransportService service) { } @Override diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index feff696e942..b1725ead326 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -75,7 +75,6 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.IndexEventListener; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.gateway.TestGatewayAllocator; import org.elasticsearch.threadpool.ThreadPool; @@ -87,7 +86,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.function.Consumer; import java.util.stream.Collectors; import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom; diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 2ccddf6bc54..81fc934ca6d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -39,7 +39,7 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportStats; import java.io.IOException; @@ -60,7 +60,7 @@ import static org.apache.lucene.util.LuceneTestCase.rarely; /** A transport class that doesn't send anything but rather captures all requests for inspection from tests */ public class CapturingTransport implements Transport { - private TransportServiceAdapter adapter; + private TransportService transportService; public static class CapturedRequest { public final DiscoveryNode node; @@ -137,7 +137,7 @@ public class CapturingTransport implements Transport { /** simulate a response for the given requestId */ public void handleResponse(final long requestId, final TransportResponse response) { - adapter.onResponseReceived(requestId).handleResponse(response); + transportService.onResponseReceived(requestId).handleResponse(response); } /** @@ -189,7 +189,7 @@ public class CapturingTransport implements Transport { * @param e the failure */ public void handleError(final long requestId, final TransportException e) { - adapter.onResponseReceived(requestId).handleException(e); + transportService.onResponseReceived(requestId).handleException(e); } @Override @@ -220,8 +220,8 @@ public class CapturingTransport implements Transport { } @Override - public void transportServiceAdapter(TransportServiceAdapter adapter) { - this.adapter = adapter; + public void setTransportService(TransportService transportService) { + this.transportService = transportService; } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 503a7ae1f79..0979cfbfea2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -44,7 +44,6 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.TaskManager; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.tasks.MockTaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; @@ -58,7 +57,6 @@ import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.transport.TransportServiceAdapter; import org.elasticsearch.transport.TransportStats; import java.io.IOException; @@ -101,16 +99,16 @@ public final class MockTransportService extends TransportService { } public static MockTransportService createNewService(Settings settings, Version version, ThreadPool threadPool, - @Nullable ClusterSettings clusterSettings) { + @Nullable ClusterSettings clusterSettings) { // some tests use MockTransportService to do network based testing. Yet, we run tests in multiple JVMs that means // concurrent tests could claim port that another JVM just released and if that test tries to simulate a disconnect it might // be smart enough to re-connect depending on what is tested. To reduce the risk, since this is very hard to debug we use // a different default port range per JVM unless the incoming settings override it int basePort = 10300 + (JVM_ORDINAL * 100); // use a non-default port otherwise some cluster in this JVM might reuse a port - settings = Settings.builder().put(TcpTransport.PORT.getKey(), basePort + "-" + (basePort+100)).put(settings).build(); + settings = Settings.builder().put(TcpTransport.PORT.getKey(), basePort + "-" + (basePort + 100)).put(settings).build(); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); final Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, - new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version); + new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version); return createNewService(settings, transport, version, threadPool, clusterSettings); } @@ -118,8 +116,8 @@ public final class MockTransportService extends TransportService { @Nullable ClusterSettings clusterSettings) { return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> - new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(), - Node.NODE_ATTRIBUTES.get(settings).getAsMap(), DiscoveryNode.getRolesFromSettings(settings), version), + new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(), + Node.NODE_ATTRIBUTES.get(settings).getAsMap(), DiscoveryNode.getRolesFromSettings(settings), version), clusterSettings); } @@ -129,10 +127,10 @@ public final class MockTransportService extends TransportService { * Build the service. * * @param clusterSettings if non null the the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings - * updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}. + * updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}. */ public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor, - @Nullable ClusterSettings clusterSettings) { + @Nullable ClusterSettings clusterSettings) { this(settings, transport, threadPool, interceptor, (boundAddress) -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), settings.get(Node.NODE_NAME_SETTING.getKey(), UUIDs.randomBase64UUID())), clusterSettings); @@ -142,7 +140,7 @@ public final class MockTransportService extends TransportService { * Build the service. * * @param clusterSettings if non null the the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings - * updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}. + * updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}. */ public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor, Function localNodeFactory, @@ -163,7 +161,7 @@ public final class MockTransportService extends TransportService { protected TaskManager createTaskManager() { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { return new MockTaskManager(settings); - } else { + } else { return super.createTaskManager(); } } @@ -547,8 +545,8 @@ public final class MockTransportService extends TransportService { } @Override - public void transportServiceAdapter(TransportServiceAdapter service) { - transport.transportServiceAdapter(service); + public void setTransportService(TransportService service) { + transport.setTransportService(service); } @Override @@ -641,7 +639,9 @@ public final class MockTransportService extends TransportService { } @Override - public void close() { transport.close(); } + public void close() { + transport.close(); + } @Override public Map profileBoundAddresses() { @@ -704,55 +704,47 @@ public final class MockTransportService extends TransportService { } @Override - protected Adapter createAdapter() { - return new MockAdapter(); + protected boolean traceEnabled() { + return super.traceEnabled() || activeTracers.isEmpty() == false; } - class MockAdapter extends Adapter { - - @Override - protected boolean traceEnabled() { - return super.traceEnabled() || activeTracers.isEmpty() == false; + @Override + protected void traceReceivedRequest(long requestId, String action) { + super.traceReceivedRequest(requestId, action); + for (Tracer tracer : activeTracers) { + tracer.receivedRequest(requestId, action); } + } - @Override - protected void traceReceivedRequest(long requestId, String action) { - super.traceReceivedRequest(requestId, action); - for (Tracer tracer : activeTracers) { - tracer.receivedRequest(requestId, action); - } + @Override + protected void traceResponseSent(long requestId, String action) { + super.traceResponseSent(requestId, action); + for (Tracer tracer : activeTracers) { + tracer.responseSent(requestId, action); } + } - @Override - protected void traceResponseSent(long requestId, String action) { - super.traceResponseSent(requestId, action); - for (Tracer tracer : activeTracers) { - tracer.responseSent(requestId, action); - } + @Override + protected void traceResponseSent(long requestId, String action, Exception e) { + super.traceResponseSent(requestId, action, e); + for (Tracer tracer : activeTracers) { + tracer.responseSent(requestId, action, e); } + } - @Override - protected void traceResponseSent(long requestId, String action, Exception e) { - super.traceResponseSent(requestId, action, e); - for (Tracer tracer : activeTracers) { - tracer.responseSent(requestId, action, e); - } + @Override + protected void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) { + super.traceReceivedResponse(requestId, sourceNode, action); + for (Tracer tracer : activeTracers) { + tracer.receivedResponse(requestId, sourceNode, action); } + } - @Override - protected void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) { - super.traceReceivedResponse(requestId, sourceNode, action); - for (Tracer tracer : activeTracers) { - tracer.receivedResponse(requestId, sourceNode, action); - } - } - - @Override - protected void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { - super.traceRequestSent(node, requestId, action, options); - for (Tracer tracer : activeTracers) { - tracer.requestSent(node, requestId, action, options); - } + @Override + protected void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { + super.traceRequestSent(node, requestId, action, options); + for (Tracer tracer : activeTracers) { + tracer.requestSent(node, requestId, action, options); } } @@ -802,6 +794,7 @@ public final class MockTransportService extends TransportService { public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { FilteredConnection filteredConnection = new FilteredConnection(super.openConnection(node, profile)) { final AtomicBoolean closed = new AtomicBoolean(false); + @Override public void close() throws IOException { try { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 2e252d112df..da43f116d42 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -1047,9 +1047,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public volatile boolean sawResponseReceived; public AtomicReference expectedEvents = new AtomicReference<>(); + Tracer(Set actions) { this.actions = actions; } + @Override public void receivedRequest(long requestId, String action) { super.receivedRequest(requestId, action); @@ -1446,7 +1448,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void handleException(TransportException exp) { Throwable cause = ExceptionsHelper.unwrapCause(exp); assertThat(cause, instanceOf(ConnectTransportException.class)); - assertThat(((ConnectTransportException)cause).node(), equalTo(nodeA)); + assertThat(((ConnectTransportException) cause).node(), equalTo(nodeA)); } }); @@ -1456,7 +1458,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (Exception e) { Throwable cause = ExceptionsHelper.unwrapCause(e); assertThat(cause, instanceOf(ConnectTransportException.class)); - assertThat(((ConnectTransportException)cause).node(), equalTo(nodeA)); + assertThat(((ConnectTransportException) cause).node(), equalTo(nodeA)); } // wait for the transport to process the sending failure and disconnect from node @@ -1586,26 +1588,26 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { CountDownLatch latch = new CountDownLatch(1); serviceA.sendRequest(connection, "action", new TestRequest(), TransportRequestOptions.EMPTY, new TransportResponseHandler() { - @Override - public TestResponse newInstance() { - return new TestResponse(); - } + @Override + public TestResponse newInstance() { + return new TestResponse(); + } - @Override - public void handleResponse(TestResponse response) { - latch.countDown(); - } + @Override + public void handleResponse(TestResponse response) { + latch.countDown(); + } - @Override - public void handleException(TransportException exp) { - latch.countDown(); - } + @Override + public void handleException(TransportException exp) { + latch.countDown(); + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); assertFalse(requestProcessed.get()); @@ -1859,14 +1861,20 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testRegisterHandlerTwice() { serviceB.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), - (request, message) -> {throw new AssertionError("boom");}); + (request, message) -> { + throw new AssertionError("boom"); + }); expectThrows(IllegalArgumentException.class, () -> serviceB.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), - (request, message) -> {throw new AssertionError("boom");}) + (request, message) -> { + throw new AssertionError("boom"); + }) ); serviceA.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), - (request, message) -> {throw new AssertionError("boom");}); + (request, message) -> { + throw new AssertionError("boom"); + }); } public void testTimeoutPerConnection() throws IOException { @@ -1914,11 +1922,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testHandshakeWithIncompatVersion() { assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); + Version version = Version.fromString("2.0.0"); try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE, - new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), - Version.fromString("2.0.0"))) { - transport.transportServiceAdapter(serviceA.new Adapter()); - transport.start(); + new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version); + MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", transport.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); @@ -1937,9 +1946,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT); try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE, - new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()),version)) { - transport.transportServiceAdapter(serviceA.new Adapter()); - transport.start(); + new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version); + MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", transport.boundAddress().publishAddress(), emptyMap(), emptySet(), Version.fromString("2.0.0")); @@ -1956,24 +1966,26 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } } - public void testTcpHandshake() throws IOException, InterruptedException { assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport); TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport(); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); - try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE, + MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList())) { @Override protected String handleRequest(MockChannel mockChannel, String profileName, StreamInput stream, long requestId, int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status) throws IOException { return super.handleRequest(mockChannel, profileName, stream, requestId, messageLengthBytes, version, remoteAddress, - (byte)(status & ~(1<<3))); // we flip the isHandshake bit back and act like the handler is not found + (byte) (status & ~(1 << 3))); // we flip the isHandshake bit back and act like the handler is not found } - }) { - transport.transportServiceAdapter(serviceA.new Adapter()); - transport.start(); + }; + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, Version.CURRENT, threadPool, + null)) { + service.start(); + service.acceptIncomingRequests(); // this acts like a node that doesn't have support for handshakes DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", transport.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); @@ -1986,7 +1998,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { TcpTransport.NodeChannels connection = originalTransport.openConnection( new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0), null - ) ) { + )) { Version version = originalTransport.executeHandshake(connection.getNode(), connection.channel(TransportRequestOptions.Type.PING), TimeValue.timeValueSeconds(10)); assertEquals(version, Version.CURRENT); @@ -2105,8 +2117,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } }; - serviceB.sendRequest(nodeA, "action", new TestRequest(randomFrom("fail", "pass")), transportResponseHandler); - serviceA.sendRequest(nodeA, "action", new TestRequest(randomFrom("fail", "pass")), transportResponseHandler); + serviceB.sendRequest(nodeA, "action", new TestRequest(randomFrom("fail", "pass")), transportResponseHandler); + serviceA.sendRequest(nodeA, "action", new TestRequest(randomFrom("fail", "pass")), transportResponseHandler); latch.await(); } @@ -2303,22 +2315,22 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { TransportRequestOptions.Type.STATE); try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) { assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here - TransportStats transportStats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake - assertEquals(1, transportStats.getRxCount()); - assertEquals(1, transportStats.getTxCount()); - assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(45, transportStats.getTxSize().getBytes()); - }); + TransportStats transportStats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake + assertEquals(1, transportStats.getRxCount()); + assertEquals(1, transportStats.getTxCount()); + assertEquals(25, transportStats.getRxSize().getBytes()); + assertEquals(45, transportStats.getTxSize().getBytes()); + }); serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, transportResponseHandler); receivedLatch.await(); assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here - TransportStats transportStats = serviceC.transport.getStats(); // request has ben send - assertEquals(1, transportStats.getRxCount()); - assertEquals(2, transportStats.getTxCount()); - assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(91, transportStats.getTxSize().getBytes()); - }); + TransportStats transportStats = serviceC.transport.getStats(); // request has ben send + assertEquals(1, transportStats.getRxCount()); + assertEquals(2, transportStats.getTxCount()); + assertEquals(25, transportStats.getRxSize().getBytes()); + assertEquals(91, transportStats.getTxSize().getBytes()); + }); sendResponseLatch.countDown(); responseLatch.await(); stats = serviceC.transport.getStats(); // response has been received @@ -2398,22 +2410,22 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { TransportRequestOptions.Type.STATE); try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) { assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here - TransportStats transportStats = serviceC.transport.getStats(); // request has ben send - assertEquals(1, transportStats.getRxCount()); - assertEquals(1, transportStats.getTxCount()); - assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(45, transportStats.getTxSize().getBytes()); - }); + TransportStats transportStats = serviceC.transport.getStats(); // request has ben send + assertEquals(1, transportStats.getRxCount()); + assertEquals(1, transportStats.getTxCount()); + assertEquals(25, transportStats.getRxSize().getBytes()); + assertEquals(45, transportStats.getTxSize().getBytes()); + }); serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, transportResponseHandler); receivedLatch.await(); assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here - TransportStats transportStats = serviceC.transport.getStats(); // request has ben send - assertEquals(1, transportStats.getRxCount()); - assertEquals(2, transportStats.getTxCount()); - assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(91, transportStats.getTxSize().getBytes()); - }); + TransportStats transportStats = serviceC.transport.getStats(); // request has ben send + assertEquals(1, transportStats.getRxCount()); + assertEquals(2, transportStats.getTxCount()); + assertEquals(25, transportStats.getRxSize().getBytes()); + assertEquals(91, transportStats.getTxSize().getBytes()); + }); sendResponseLatch.countDown(); responseLatch.await(); stats = serviceC.transport.getStats(); // exception response has been received