Merge tsa with ts (#26369)
We currently have a weird relationship between Transport, TransportService, and TransportServiceAdaptor. At some point I think that we would like to collapse these all into one concept as we only support TCP transports. This commit moves in that direction by eliminating the adaptor and just passing the transport service to the transport.
This commit is contained in:
parent
760bd6c568
commit
c1a20f7e48
|
@ -194,7 +194,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
protected final NetworkService networkService;
|
||||
protected final Set<ProfileSettings> profileSettings;
|
||||
|
||||
protected volatile TransportServiceAdapter transportServiceAdapter;
|
||||
protected volatile TransportService transportService;
|
||||
// node id to actual channel
|
||||
protected final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
|
||||
|
||||
|
@ -270,11 +270,11 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
}
|
||||
|
||||
@Override
|
||||
public void transportServiceAdapter(TransportServiceAdapter service) {
|
||||
public void setTransportService(TransportService service) {
|
||||
if (service.getRequestHandler(HANDSHAKE_ACTION_NAME) != null) {
|
||||
throw new IllegalStateException(HANDSHAKE_ACTION_NAME + " is a reserved request handler and must not be registered");
|
||||
}
|
||||
this.transportServiceAdapter = service;
|
||||
this.transportService = service;
|
||||
}
|
||||
|
||||
private static class HandshakeResponseHandler<Channel> implements TransportResponseHandler<VersionHandshakeResponse> {
|
||||
|
@ -444,7 +444,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
try {
|
||||
closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false);
|
||||
} finally {
|
||||
transportServiceAdapter.onConnectionClosed(this);
|
||||
transportService.onConnectionClosed(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -500,7 +500,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
logger.debug("connected to node [{}]", node);
|
||||
}
|
||||
try {
|
||||
transportServiceAdapter.onNodeConnected(node);
|
||||
transportService.onNodeConnected(node);
|
||||
} finally {
|
||||
if (nodeChannels.isClosed()) {
|
||||
// we got closed concurrently due to a disconnect or some other event on the channel.
|
||||
|
@ -512,7 +512,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
// try to remove it first either way one of the two wins even if the callback has run before we even added the
|
||||
// tuple to the map since in that case we remove it here again
|
||||
if (connectedNodes.remove(node, nodeChannels)) {
|
||||
transportServiceAdapter.onNodeDisconnected(node);
|
||||
transportService.onNodeDisconnected(node);
|
||||
}
|
||||
throw new NodeNotConnectedException(node, "connection concurrently closed");
|
||||
}
|
||||
|
@ -597,7 +597,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
connectTimeout : connectionProfile.getHandshakeTimeout();
|
||||
final Version version = executeHandshake(node, channel, handshakeTimeout);
|
||||
nodeChannels = new NodeChannels(nodeChannels, version); // clone the channels - we now have the correct version
|
||||
transportServiceAdapter.onConnectionOpened(nodeChannels);
|
||||
transportService.onConnectionOpened(nodeChannels);
|
||||
connectionRef.set(nodeChannels);
|
||||
success = true;
|
||||
return nodeChannels;
|
||||
|
@ -625,7 +625,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
if (closeLock.readLock().tryLock()) {
|
||||
try {
|
||||
if (connectedNodes.remove(node, nodeChannels)) {
|
||||
transportServiceAdapter.onNodeDisconnected(node);
|
||||
transportService.onNodeDisconnected(node);
|
||||
}
|
||||
} finally {
|
||||
closeLock.readLock().unlock();
|
||||
|
@ -665,7 +665,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
} finally {
|
||||
closeLock.readLock().unlock();
|
||||
if (nodeChannels != null) { // if we found it and removed it we close and notify
|
||||
IOUtils.closeWhileHandlingException(nodeChannels, () -> transportServiceAdapter.onNodeDisconnected(node));
|
||||
IOUtils.closeWhileHandlingException(nodeChannels, () -> transportService.onNodeDisconnected(node));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -916,7 +916,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
Map.Entry<DiscoveryNode, NodeChannels> next = iterator.next();
|
||||
try {
|
||||
IOUtils.closeWhileHandlingException(next.getValue());
|
||||
transportServiceAdapter.onNodeDisconnected(next.getKey());
|
||||
transportService.onNodeDisconnected(next.getKey());
|
||||
} finally {
|
||||
iterator.remove();
|
||||
}
|
||||
|
@ -1078,7 +1078,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
final TransportRequestOptions finalOptions = options;
|
||||
// this might be called in a different thread
|
||||
SendListener onRequestSent = new SendListener(stream,
|
||||
() -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions), message.length());
|
||||
() -> transportService.onRequestSent(node, requestId, action, request, finalOptions), message.length());
|
||||
internalSendMessage(targetChannel, message, onRequestSent);
|
||||
addedReleaseListener = true;
|
||||
} finally {
|
||||
|
@ -1125,7 +1125,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length());
|
||||
CompositeBytesReference message = new CompositeBytesReference(header, bytes);
|
||||
SendListener onResponseSent = new SendListener(null,
|
||||
() -> transportServiceAdapter.onResponseSent(requestId, action, error), message.length());
|
||||
() -> transportService.onResponseSent(requestId, action, error), message.length());
|
||||
internalSendMessage(channel, message, onResponseSent);
|
||||
}
|
||||
}
|
||||
|
@ -1160,7 +1160,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
final TransportResponseOptions finalOptions = options;
|
||||
// this might be called in a different thread
|
||||
SendListener listener = new SendListener(stream,
|
||||
() -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions), message.length());
|
||||
() -> transportService.onResponseSent(requestId, action, response, finalOptions), message.length());
|
||||
internalSendMessage(channel, message, listener);
|
||||
addedReleaseListener = true;
|
||||
} finally {
|
||||
|
@ -1356,14 +1356,14 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
if (isHandshake) {
|
||||
handler = pendingHandshakes.remove(requestId);
|
||||
} else {
|
||||
TransportResponseHandler theHandler = transportServiceAdapter.onResponseReceived(requestId);
|
||||
TransportResponseHandler theHandler = transportService.onResponseReceived(requestId);
|
||||
if (theHandler == null && TransportStatus.isError(status)) {
|
||||
handler = pendingHandshakes.remove(requestId);
|
||||
} else {
|
||||
handler = theHandler;
|
||||
}
|
||||
}
|
||||
// ignore if its null, the adapter logs it
|
||||
// ignore if its null, the service logs it
|
||||
if (handler != null) {
|
||||
if (TransportStatus.isError(status)) {
|
||||
handlerResponseError(streamIn, handler);
|
||||
|
@ -1456,7 +1456,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
protected String handleRequest(Channel channel, String profileName, final StreamInput stream, long requestId, int messageLengthBytes,
|
||||
Version version, InetSocketAddress remoteAddress, byte status) throws IOException {
|
||||
final String action = stream.readString();
|
||||
transportServiceAdapter.onRequestReceived(requestId, action);
|
||||
transportService.onRequestReceived(requestId, action);
|
||||
TransportChannel transportChannel = null;
|
||||
try {
|
||||
if (TransportStatus.isHandshake(status)) {
|
||||
|
@ -1464,7 +1464,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
sendResponse(version, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY,
|
||||
TransportStatus.setHandshake((byte) 0));
|
||||
} else {
|
||||
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
|
||||
final RequestHandlerRegistry reg = transportService.getRequestHandler(action);
|
||||
if (reg == null) {
|
||||
throw new ActionNotFoundTransportException(action);
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ public interface Transport extends LifecycleComponent {
|
|||
|
||||
Setting<Boolean> TRANSPORT_TCP_COMPRESS = Setting.boolSetting("transport.tcp.compress", false, Property.NodeScope);
|
||||
|
||||
void transportServiceAdapter(TransportServiceAdapter service);
|
||||
void setTransportService(TransportService service);
|
||||
|
||||
/**
|
||||
* The address the transport is bound on.
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.metrics.MeanMetric;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
|
@ -106,8 +105,6 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
}
|
||||
});
|
||||
|
||||
private final TransportService.Adapter adapter;
|
||||
|
||||
public static final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() {};
|
||||
|
||||
// tracer log
|
||||
|
@ -148,7 +145,7 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
* Build the service.
|
||||
*
|
||||
* @param clusterSettings if non null the the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
|
||||
* updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}.
|
||||
* updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}.
|
||||
*/
|
||||
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
|
||||
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings) {
|
||||
|
@ -160,7 +157,6 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
setTracerLogInclude(TRACE_LOG_INCLUDE_SETTING.get(settings));
|
||||
setTracerLogExclude(TRACE_LOG_EXCLUDE_SETTING.get(settings));
|
||||
tracerLog = Loggers.getLogger(logger, ".tracer");
|
||||
adapter = createAdapter();
|
||||
taskManager = createTaskManager();
|
||||
this.interceptor = transportInterceptor;
|
||||
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
|
||||
|
@ -187,10 +183,6 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
return taskManager;
|
||||
}
|
||||
|
||||
protected Adapter createAdapter() {
|
||||
return new Adapter();
|
||||
}
|
||||
|
||||
protected TaskManager createTaskManager() {
|
||||
return new TaskManager(settings);
|
||||
}
|
||||
|
@ -205,7 +197,7 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
transport.transportServiceAdapter(adapter);
|
||||
transport.setTransportService(this);
|
||||
transport.start();
|
||||
|
||||
if (transport.boundAddress() != null && logger.isInfoEnabled()) {
|
||||
|
@ -632,11 +624,11 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
}
|
||||
|
||||
private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {
|
||||
final DirectResponseChannel channel = new DirectResponseChannel(logger, localNode, action, requestId, adapter, threadPool);
|
||||
final DirectResponseChannel channel = new DirectResponseChannel(logger, localNode, action, requestId, this, threadPool);
|
||||
try {
|
||||
adapter.onRequestSent(localNode, requestId, action, request, options);
|
||||
adapter.onRequestReceived(requestId, action);
|
||||
final RequestHandlerRegistry reg = adapter.getRequestHandler(action);
|
||||
onRequestSent(localNode, requestId, action, request, options);
|
||||
onRequestReceived(requestId, action);
|
||||
final RequestHandlerRegistry reg = getRequestHandler(action);
|
||||
if (reg == null) {
|
||||
throw new ActionNotFoundTransportException("Action [" + action + "] not found");
|
||||
}
|
||||
|
@ -782,177 +774,171 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
}
|
||||
}
|
||||
|
||||
protected RequestHandlerRegistry getRequestHandler(String action) {
|
||||
/** called by the {@link Transport} implementation once a request has been sent */
|
||||
void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) {
|
||||
if (traceEnabled() && shouldTraceAction(action)) {
|
||||
traceRequestSent(node, requestId, action, options);
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean traceEnabled() {
|
||||
return tracerLog.isTraceEnabled();
|
||||
}
|
||||
|
||||
/** called by the {@link Transport} implementation once a response was sent to calling node */
|
||||
void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions options) {
|
||||
if (traceEnabled() && shouldTraceAction(action)) {
|
||||
traceResponseSent(requestId, action);
|
||||
}
|
||||
}
|
||||
|
||||
/** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */
|
||||
void onResponseSent(long requestId, String action, Exception e) {
|
||||
if (traceEnabled() && shouldTraceAction(action)) {
|
||||
traceResponseSent(requestId, action, e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void traceResponseSent(long requestId, String action, Exception e) {
|
||||
tracerLog.trace(
|
||||
(org.apache.logging.log4j.util.Supplier<?>)
|
||||
() -> new ParameterizedMessage("[{}][{}] sent error response", requestId, action), e);
|
||||
}
|
||||
|
||||
/**
|
||||
* called by the {@link Transport} implementation when an incoming request arrives but before
|
||||
* any parsing of it has happened (with the exception of the requestId and action)
|
||||
*/
|
||||
void onRequestReceived(long requestId, String action) {
|
||||
try {
|
||||
blockIncomingRequestsLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
logger.trace("interrupted while waiting for incoming requests block to be removed");
|
||||
}
|
||||
if (traceEnabled() && shouldTraceAction(action)) {
|
||||
traceReceivedRequest(requestId, action);
|
||||
}
|
||||
}
|
||||
|
||||
public RequestHandlerRegistry getRequestHandler(String action) {
|
||||
return requestHandlers.get(action);
|
||||
}
|
||||
|
||||
protected class Adapter implements TransportServiceAdapter {
|
||||
/**
|
||||
* called by the {@link Transport} implementation when a response or an exception has been received for a previously
|
||||
* sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not
|
||||
* found.
|
||||
*/
|
||||
public TransportResponseHandler onResponseReceived(final long requestId) {
|
||||
RequestHolder holder = clientHandlers.remove(requestId);
|
||||
|
||||
@Override
|
||||
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) {
|
||||
if (traceEnabled() && shouldTraceAction(action)) {
|
||||
traceRequestSent(node, requestId, action, options);
|
||||
}
|
||||
if (holder == null) {
|
||||
checkForTimeout(requestId);
|
||||
return null;
|
||||
}
|
||||
|
||||
protected boolean traceEnabled() {
|
||||
return tracerLog.isTraceEnabled();
|
||||
holder.cancelTimeout();
|
||||
if (traceEnabled() && shouldTraceAction(holder.action())) {
|
||||
traceReceivedResponse(requestId, holder.connection().getNode(), holder.action());
|
||||
}
|
||||
return holder.handler();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions options) {
|
||||
if (traceEnabled() && shouldTraceAction(action)) {
|
||||
traceResponseSent(requestId, action);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponseSent(long requestId, String action, Exception e) {
|
||||
if (traceEnabled() && shouldTraceAction(action)) {
|
||||
traceResponseSent(requestId, action, e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void traceResponseSent(long requestId, String action, Exception e) {
|
||||
tracerLog.trace(
|
||||
(org.apache.logging.log4j.util.Supplier<?>)
|
||||
() -> new ParameterizedMessage("[{}][{}] sent error response", requestId, action), e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRequestReceived(long requestId, String action) {
|
||||
try {
|
||||
blockIncomingRequestsLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
logger.trace("interrupted while waiting for incoming requests block to be removed");
|
||||
}
|
||||
if (traceEnabled() && shouldTraceAction(action)) {
|
||||
traceReceivedRequest(requestId, action);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RequestHandlerRegistry getRequestHandler(String action) {
|
||||
return requestHandlers.get(action);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransportResponseHandler onResponseReceived(final long requestId) {
|
||||
RequestHolder holder = clientHandlers.remove(requestId);
|
||||
|
||||
if (holder == null) {
|
||||
checkForTimeout(requestId);
|
||||
return null;
|
||||
}
|
||||
holder.cancelTimeout();
|
||||
if (traceEnabled() && shouldTraceAction(holder.action())) {
|
||||
traceReceivedResponse(requestId, holder.connection().getNode(), holder.action());
|
||||
}
|
||||
return holder.handler();
|
||||
}
|
||||
|
||||
protected void checkForTimeout(long requestId) {
|
||||
// lets see if its in the timeout holder, but sync on mutex to make sure any ongoing timeout handling has finished
|
||||
final DiscoveryNode sourceNode;
|
||||
final String action;
|
||||
assert clientHandlers.get(requestId) == null;
|
||||
TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId);
|
||||
if (timeoutInfoHolder != null) {
|
||||
long time = System.currentTimeMillis();
|
||||
logger.warn("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, " +
|
||||
private void checkForTimeout(long requestId) {
|
||||
// lets see if its in the timeout holder, but sync on mutex to make sure any ongoing timeout handling has finished
|
||||
final DiscoveryNode sourceNode;
|
||||
final String action;
|
||||
assert clientHandlers.get(requestId) == null;
|
||||
TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId);
|
||||
if (timeoutInfoHolder != null) {
|
||||
long time = System.currentTimeMillis();
|
||||
logger.warn("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, " +
|
||||
"action [{}], node [{}], id [{}]", time - timeoutInfoHolder.sentTime(), time - timeoutInfoHolder.timeoutTime(),
|
||||
timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId);
|
||||
action = timeoutInfoHolder.action();
|
||||
sourceNode = timeoutInfoHolder.node();
|
||||
} else {
|
||||
logger.warn("Transport response handler not found of id [{}]", requestId);
|
||||
action = null;
|
||||
sourceNode = null;
|
||||
}
|
||||
// call tracer out of lock
|
||||
if (traceEnabled() == false) {
|
||||
return;
|
||||
}
|
||||
if (action == null) {
|
||||
assert sourceNode == null;
|
||||
traceUnresolvedResponse(requestId);
|
||||
} else if (shouldTraceAction(action)) {
|
||||
traceReceivedResponse(requestId, sourceNode, action);
|
||||
}
|
||||
timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId);
|
||||
action = timeoutInfoHolder.action();
|
||||
sourceNode = timeoutInfoHolder.node();
|
||||
} else {
|
||||
logger.warn("Transport response handler not found of id [{}]", requestId);
|
||||
action = null;
|
||||
sourceNode = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNodeConnected(final DiscoveryNode node) {
|
||||
// capture listeners before spawning the background callback so the following pattern won't trigger a call
|
||||
// connectToNode(); connection is completed successfully
|
||||
// addConnectionListener(); this listener shouldn't be called
|
||||
final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream();
|
||||
threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onNodeConnected(node)));
|
||||
// call tracer out of lock
|
||||
if (traceEnabled() == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConnectionOpened(Transport.Connection connection) {
|
||||
// capture listeners before spawning the background callback so the following pattern won't trigger a call
|
||||
// connectToNode(); connection is completed successfully
|
||||
// addConnectionListener(); this listener shouldn't be called
|
||||
final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream();
|
||||
threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(connection)));
|
||||
if (action == null) {
|
||||
assert sourceNode == null;
|
||||
traceUnresolvedResponse(requestId);
|
||||
} else if (shouldTraceAction(action)) {
|
||||
traceReceivedResponse(requestId, sourceNode, action);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNodeDisconnected(final DiscoveryNode node) {
|
||||
try {
|
||||
threadPool.generic().execute( () -> {
|
||||
for (final TransportConnectionListener connectionListener : connectionListeners) {
|
||||
connectionListener.onNodeDisconnected(node);
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
logger.debug("Rejected execution on NodeDisconnected", ex);
|
||||
}
|
||||
void onNodeConnected(final DiscoveryNode node) {
|
||||
// capture listeners before spawning the background callback so the following pattern won't trigger a call
|
||||
// connectToNode(); connection is completed successfully
|
||||
// addConnectionListener(); this listener shouldn't be called
|
||||
final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream();
|
||||
threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onNodeConnected(node)));
|
||||
}
|
||||
|
||||
void onConnectionOpened(Transport.Connection connection) {
|
||||
// capture listeners before spawning the background callback so the following pattern won't trigger a call
|
||||
// connectToNode(); connection is completed successfully
|
||||
// addConnectionListener(); this listener shouldn't be called
|
||||
final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream();
|
||||
threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(connection)));
|
||||
}
|
||||
|
||||
public void onNodeDisconnected(final DiscoveryNode node) {
|
||||
try {
|
||||
threadPool.generic().execute( () -> {
|
||||
for (final TransportConnectionListener connectionListener : connectionListeners) {
|
||||
connectionListener.onNodeDisconnected(node);
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
logger.debug("Rejected execution on NodeDisconnected", ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConnectionClosed(Transport.Connection connection) {
|
||||
try {
|
||||
for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
|
||||
RequestHolder holder = entry.getValue();
|
||||
if (holder.connection().getCacheKey().equals(connection.getCacheKey())) {
|
||||
final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey());
|
||||
if (holderToNotify != null) {
|
||||
// callback that an exception happened, but on a different thread since we don't
|
||||
// want handlers to worry about stack overflows
|
||||
threadPool.generic().execute(() -> holderToNotify.handler().handleException(new NodeDisconnectedException(
|
||||
connection.getNode(), holderToNotify.action())));
|
||||
}
|
||||
void onConnectionClosed(Transport.Connection connection) {
|
||||
try {
|
||||
for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
|
||||
RequestHolder holder = entry.getValue();
|
||||
if (holder.connection().getCacheKey().equals(connection.getCacheKey())) {
|
||||
final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey());
|
||||
if (holderToNotify != null) {
|
||||
// callback that an exception happened, but on a different thread since we don't
|
||||
// want handlers to worry about stack overflows
|
||||
threadPool.generic().execute(() -> holderToNotify.handler().handleException(new NodeDisconnectedException(
|
||||
connection.getNode(), holderToNotify.action())));
|
||||
}
|
||||
}
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
logger.debug("Rejected execution on onConnectionClosed", ex);
|
||||
}
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
logger.debug("Rejected execution on onConnectionClosed", ex);
|
||||
}
|
||||
}
|
||||
|
||||
protected void traceReceivedRequest(long requestId, String action) {
|
||||
tracerLog.trace("[{}][{}] received request", requestId, action);
|
||||
}
|
||||
protected void traceReceivedRequest(long requestId, String action) {
|
||||
tracerLog.trace("[{}][{}] received request", requestId, action);
|
||||
}
|
||||
|
||||
protected void traceResponseSent(long requestId, String action) {
|
||||
tracerLog.trace("[{}][{}] sent response", requestId, action);
|
||||
}
|
||||
protected void traceResponseSent(long requestId, String action) {
|
||||
tracerLog.trace("[{}][{}] sent response", requestId, action);
|
||||
}
|
||||
|
||||
protected void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
|
||||
tracerLog.trace("[{}][{}] received response from [{}]", requestId, action, sourceNode);
|
||||
}
|
||||
protected void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
|
||||
tracerLog.trace("[{}][{}] received response from [{}]", requestId, action, sourceNode);
|
||||
}
|
||||
|
||||
protected void traceUnresolvedResponse(long requestId) {
|
||||
tracerLog.trace("[{}] received response but can't resolve it to a request", requestId);
|
||||
}
|
||||
|
||||
protected void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
|
||||
tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout());
|
||||
}
|
||||
protected void traceUnresolvedResponse(long requestId) {
|
||||
tracerLog.trace("[{}] received response but can't resolve it to a request", requestId);
|
||||
}
|
||||
|
||||
protected void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
|
||||
tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout());
|
||||
}
|
||||
|
||||
class TimeoutHandler implements Runnable {
|
||||
|
@ -1118,16 +1104,16 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
final DiscoveryNode localNode;
|
||||
private final String action;
|
||||
private final long requestId;
|
||||
final TransportServiceAdapter adapter;
|
||||
final TransportService service;
|
||||
final ThreadPool threadPool;
|
||||
|
||||
DirectResponseChannel(Logger logger, DiscoveryNode localNode, String action, long requestId,
|
||||
TransportServiceAdapter adapter, ThreadPool threadPool) {
|
||||
TransportService service, ThreadPool threadPool) {
|
||||
this.logger = logger;
|
||||
this.localNode = localNode;
|
||||
this.action = action;
|
||||
this.requestId = requestId;
|
||||
this.adapter = adapter;
|
||||
this.service = service;
|
||||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
|
@ -1148,9 +1134,9 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
|
||||
@Override
|
||||
public void sendResponse(final TransportResponse response, TransportResponseOptions options) throws IOException {
|
||||
adapter.onResponseSent(requestId, action, response, options);
|
||||
final TransportResponseHandler handler = adapter.onResponseReceived(requestId);
|
||||
// ignore if its null, the adapter logs it
|
||||
service.onResponseSent(requestId, action, response, options);
|
||||
final TransportResponseHandler handler = service.onResponseReceived(requestId);
|
||||
// ignore if its null, the service logs it
|
||||
if (handler != null) {
|
||||
final String executor = handler.executor();
|
||||
if (ThreadPool.Names.SAME.equals(executor)) {
|
||||
|
@ -1172,9 +1158,9 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
|
||||
@Override
|
||||
public void sendResponse(Exception exception) throws IOException {
|
||||
adapter.onResponseSent(requestId, action, exception);
|
||||
final TransportResponseHandler handler = adapter.onResponseReceived(requestId);
|
||||
// ignore if its null, the adapter logs it
|
||||
service.onResponseSent(requestId, action, exception);
|
||||
final TransportResponseHandler handler = service.onResponseReceived(requestId);
|
||||
// ignore if its null, the service logs it
|
||||
if (handler != null) {
|
||||
final RemoteTransportException rtx = wrapInRemote(exception);
|
||||
final String executor = handler.executor();
|
||||
|
|
|
@ -1,49 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
||||
public interface TransportServiceAdapter extends TransportConnectionListener {
|
||||
|
||||
/** called by the {@link Transport} implementation once a request has been sent */
|
||||
void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options);
|
||||
|
||||
/** called by the {@link Transport} implementation once a response was sent to calling node */
|
||||
void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions options);
|
||||
|
||||
/** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */
|
||||
void onResponseSent(long requestId, String action, Exception e);
|
||||
|
||||
/**
|
||||
* called by the {@link Transport} implementation when a response or an exception has been received for a previously
|
||||
* sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not
|
||||
* found.
|
||||
*/
|
||||
TransportResponseHandler onResponseReceived(long requestId);
|
||||
|
||||
/**
|
||||
* called by the {@link Transport} implementation when an incoming request arrives but before
|
||||
* any parsing of it has happened (with the exception of the requestId and action)
|
||||
*/
|
||||
void onRequestReceived(long requestId, String action);
|
||||
|
||||
RequestHandlerRegistry getRequestHandler(String action);
|
||||
}
|
|
@ -174,10 +174,10 @@ public abstract class TaskManagerTestCase extends ESTestCase {
|
|||
return discoveryNode.get();
|
||||
};
|
||||
transportService = new TransportService(settings,
|
||||
new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(),
|
||||
new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(),
|
||||
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
|
||||
new NetworkService(Collections.emptyList())),
|
||||
threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddressDiscoveryNodeFunction, null) {
|
||||
threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddressDiscoveryNodeFunction, null) {
|
||||
@Override
|
||||
protected TaskManager createTaskManager() {
|
||||
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {
|
||||
|
|
|
@ -40,7 +40,7 @@ import org.elasticsearch.transport.TransportRequest;
|
|||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportStats;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -60,7 +60,7 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
|
|||
|
||||
private boolean connectMode = true;
|
||||
|
||||
private TransportServiceAdapter transportServiceAdapter;
|
||||
private TransportService transportService;
|
||||
|
||||
private final AtomicInteger connectTransportExceptions = new AtomicInteger();
|
||||
private final AtomicInteger failures = new AtomicInteger();
|
||||
|
@ -90,12 +90,12 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
|
|||
//we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info
|
||||
if (connectMode) {
|
||||
if (TransportLivenessAction.NAME.equals(action)) {
|
||||
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
|
||||
TransportResponseHandler transportResponseHandler = transportService.onResponseReceived(requestId);
|
||||
transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING.
|
||||
getDefault(Settings.EMPTY),
|
||||
node));
|
||||
} else if (ClusterStateAction.NAME.equals(action)) {
|
||||
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
|
||||
TransportResponseHandler transportResponseHandler = transportService.onResponseReceived(requestId);
|
||||
ClusterState clusterState = getMockClusterState(node);
|
||||
transportResponseHandler.handleResponse(new ClusterStateResponse(clusterName, clusterState, 0L));
|
||||
} else {
|
||||
|
@ -116,7 +116,7 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
|
|||
//throw whatever exception that is not a subclass of ConnectTransportException
|
||||
throw new IllegalStateException();
|
||||
} else {
|
||||
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
|
||||
TransportResponseHandler transportResponseHandler = transportService.onResponseReceived(requestId);
|
||||
if (random.nextBoolean()) {
|
||||
successes.incrementAndGet();
|
||||
transportResponseHandler.handleResponse(newResponse());
|
||||
|
@ -163,8 +163,8 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
|
|||
}
|
||||
|
||||
@Override
|
||||
public void transportServiceAdapter(TransportServiceAdapter transportServiceAdapter) {
|
||||
this.transportServiceAdapter = transportServiceAdapter;
|
||||
public void setTransportService(TransportService transportServiceAdapter) {
|
||||
this.transportService = transportServiceAdapter;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -61,7 +61,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.elasticsearch.transport.TransportException;
|
|||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.TransportStats;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -176,7 +175,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|||
volatile boolean randomConnectionExceptions = false;
|
||||
|
||||
@Override
|
||||
public void transportServiceAdapter(TransportServiceAdapter service) {
|
||||
public void setTransportService(TransportService service) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -75,7 +75,6 @@ import org.elasticsearch.env.Environment;
|
|||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.test.gateway.TestGatewayAllocator;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -87,7 +86,6 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom;
|
||||
|
|
|
@ -39,7 +39,7 @@ import org.elasticsearch.transport.TransportException;
|
|||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportStats;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -60,7 +60,7 @@ import static org.apache.lucene.util.LuceneTestCase.rarely;
|
|||
/** A transport class that doesn't send anything but rather captures all requests for inspection from tests */
|
||||
public class CapturingTransport implements Transport {
|
||||
|
||||
private TransportServiceAdapter adapter;
|
||||
private TransportService transportService;
|
||||
|
||||
public static class CapturedRequest {
|
||||
public final DiscoveryNode node;
|
||||
|
@ -137,7 +137,7 @@ public class CapturingTransport implements Transport {
|
|||
|
||||
/** simulate a response for the given requestId */
|
||||
public void handleResponse(final long requestId, final TransportResponse response) {
|
||||
adapter.onResponseReceived(requestId).handleResponse(response);
|
||||
transportService.onResponseReceived(requestId).handleResponse(response);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -189,7 +189,7 @@ public class CapturingTransport implements Transport {
|
|||
* @param e the failure
|
||||
*/
|
||||
public void handleError(final long requestId, final TransportException e) {
|
||||
adapter.onResponseReceived(requestId).handleException(e);
|
||||
transportService.onResponseReceived(requestId).handleException(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -220,8 +220,8 @@ public class CapturingTransport implements Transport {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void transportServiceAdapter(TransportServiceAdapter adapter) {
|
||||
this.adapter = adapter;
|
||||
public void setTransportService(TransportService transportService) {
|
||||
this.transportService = transportService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
|||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.tasks.MockTaskManager;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
|
@ -58,7 +57,6 @@ import org.elasticsearch.transport.TransportInterceptor;
|
|||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.TransportStats;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -101,16 +99,16 @@ public final class MockTransportService extends TransportService {
|
|||
}
|
||||
|
||||
public static MockTransportService createNewService(Settings settings, Version version, ThreadPool threadPool,
|
||||
@Nullable ClusterSettings clusterSettings) {
|
||||
@Nullable ClusterSettings clusterSettings) {
|
||||
// some tests use MockTransportService to do network based testing. Yet, we run tests in multiple JVMs that means
|
||||
// concurrent tests could claim port that another JVM just released and if that test tries to simulate a disconnect it might
|
||||
// be smart enough to re-connect depending on what is tested. To reduce the risk, since this is very hard to debug we use
|
||||
// a different default port range per JVM unless the incoming settings override it
|
||||
int basePort = 10300 + (JVM_ORDINAL * 100); // use a non-default port otherwise some cluster in this JVM might reuse a port
|
||||
settings = Settings.builder().put(TcpTransport.PORT.getKey(), basePort + "-" + (basePort+100)).put(settings).build();
|
||||
settings = Settings.builder().put(TcpTransport.PORT.getKey(), basePort + "-" + (basePort + 100)).put(settings).build();
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
|
||||
final Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
||||
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version);
|
||||
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version);
|
||||
return createNewService(settings, transport, version, threadPool, clusterSettings);
|
||||
}
|
||||
|
||||
|
@ -118,8 +116,8 @@ public final class MockTransportService extends TransportService {
|
|||
@Nullable ClusterSettings clusterSettings) {
|
||||
return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
||||
boundAddress ->
|
||||
new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(),
|
||||
Node.NODE_ATTRIBUTES.get(settings).getAsMap(), DiscoveryNode.getRolesFromSettings(settings), version),
|
||||
new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(),
|
||||
Node.NODE_ATTRIBUTES.get(settings).getAsMap(), DiscoveryNode.getRolesFromSettings(settings), version),
|
||||
clusterSettings);
|
||||
}
|
||||
|
||||
|
@ -129,10 +127,10 @@ public final class MockTransportService extends TransportService {
|
|||
* Build the service.
|
||||
*
|
||||
* @param clusterSettings if non null the the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
|
||||
* updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}.
|
||||
* updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}.
|
||||
*/
|
||||
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor,
|
||||
@Nullable ClusterSettings clusterSettings) {
|
||||
@Nullable ClusterSettings clusterSettings) {
|
||||
this(settings, transport, threadPool, interceptor, (boundAddress) ->
|
||||
DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), settings.get(Node.NODE_NAME_SETTING.getKey(),
|
||||
UUIDs.randomBase64UUID())), clusterSettings);
|
||||
|
@ -142,7 +140,7 @@ public final class MockTransportService extends TransportService {
|
|||
* Build the service.
|
||||
*
|
||||
* @param clusterSettings if non null the the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
|
||||
* updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}.
|
||||
* updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}.
|
||||
*/
|
||||
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor,
|
||||
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
|
||||
|
@ -163,7 +161,7 @@ public final class MockTransportService extends TransportService {
|
|||
protected TaskManager createTaskManager() {
|
||||
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {
|
||||
return new MockTaskManager(settings);
|
||||
} else {
|
||||
} else {
|
||||
return super.createTaskManager();
|
||||
}
|
||||
}
|
||||
|
@ -547,8 +545,8 @@ public final class MockTransportService extends TransportService {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void transportServiceAdapter(TransportServiceAdapter service) {
|
||||
transport.transportServiceAdapter(service);
|
||||
public void setTransportService(TransportService service) {
|
||||
transport.setTransportService(service);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -641,7 +639,9 @@ public final class MockTransportService extends TransportService {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void close() { transport.close(); }
|
||||
public void close() {
|
||||
transport.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BoundTransportAddress> profileBoundAddresses() {
|
||||
|
@ -704,55 +704,47 @@ public final class MockTransportService extends TransportService {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Adapter createAdapter() {
|
||||
return new MockAdapter();
|
||||
protected boolean traceEnabled() {
|
||||
return super.traceEnabled() || activeTracers.isEmpty() == false;
|
||||
}
|
||||
|
||||
class MockAdapter extends Adapter {
|
||||
|
||||
@Override
|
||||
protected boolean traceEnabled() {
|
||||
return super.traceEnabled() || activeTracers.isEmpty() == false;
|
||||
@Override
|
||||
protected void traceReceivedRequest(long requestId, String action) {
|
||||
super.traceReceivedRequest(requestId, action);
|
||||
for (Tracer tracer : activeTracers) {
|
||||
tracer.receivedRequest(requestId, action);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void traceReceivedRequest(long requestId, String action) {
|
||||
super.traceReceivedRequest(requestId, action);
|
||||
for (Tracer tracer : activeTracers) {
|
||||
tracer.receivedRequest(requestId, action);
|
||||
}
|
||||
@Override
|
||||
protected void traceResponseSent(long requestId, String action) {
|
||||
super.traceResponseSent(requestId, action);
|
||||
for (Tracer tracer : activeTracers) {
|
||||
tracer.responseSent(requestId, action);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void traceResponseSent(long requestId, String action) {
|
||||
super.traceResponseSent(requestId, action);
|
||||
for (Tracer tracer : activeTracers) {
|
||||
tracer.responseSent(requestId, action);
|
||||
}
|
||||
@Override
|
||||
protected void traceResponseSent(long requestId, String action, Exception e) {
|
||||
super.traceResponseSent(requestId, action, e);
|
||||
for (Tracer tracer : activeTracers) {
|
||||
tracer.responseSent(requestId, action, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void traceResponseSent(long requestId, String action, Exception e) {
|
||||
super.traceResponseSent(requestId, action, e);
|
||||
for (Tracer tracer : activeTracers) {
|
||||
tracer.responseSent(requestId, action, e);
|
||||
}
|
||||
@Override
|
||||
protected void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
|
||||
super.traceReceivedResponse(requestId, sourceNode, action);
|
||||
for (Tracer tracer : activeTracers) {
|
||||
tracer.receivedResponse(requestId, sourceNode, action);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
|
||||
super.traceReceivedResponse(requestId, sourceNode, action);
|
||||
for (Tracer tracer : activeTracers) {
|
||||
tracer.receivedResponse(requestId, sourceNode, action);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
|
||||
super.traceRequestSent(node, requestId, action, options);
|
||||
for (Tracer tracer : activeTracers) {
|
||||
tracer.requestSent(node, requestId, action, options);
|
||||
}
|
||||
@Override
|
||||
protected void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
|
||||
super.traceRequestSent(node, requestId, action, options);
|
||||
for (Tracer tracer : activeTracers) {
|
||||
tracer.requestSent(node, requestId, action, options);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -802,6 +794,7 @@ public final class MockTransportService extends TransportService {
|
|||
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
|
||||
FilteredConnection filteredConnection = new FilteredConnection(super.openConnection(node, profile)) {
|
||||
final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
|
|
|
@ -1047,9 +1047,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
public volatile boolean sawResponseReceived;
|
||||
|
||||
public AtomicReference<CountDownLatch> expectedEvents = new AtomicReference<>();
|
||||
|
||||
Tracer(Set<String> actions) {
|
||||
this.actions = actions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void receivedRequest(long requestId, String action) {
|
||||
super.receivedRequest(requestId, action);
|
||||
|
@ -1446,7 +1448,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
public void handleException(TransportException exp) {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(exp);
|
||||
assertThat(cause, instanceOf(ConnectTransportException.class));
|
||||
assertThat(((ConnectTransportException)cause).node(), equalTo(nodeA));
|
||||
assertThat(((ConnectTransportException) cause).node(), equalTo(nodeA));
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -1456,7 +1458,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
} catch (Exception e) {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
assertThat(cause, instanceOf(ConnectTransportException.class));
|
||||
assertThat(((ConnectTransportException)cause).node(), equalTo(nodeA));
|
||||
assertThat(((ConnectTransportException) cause).node(), equalTo(nodeA));
|
||||
}
|
||||
|
||||
// wait for the transport to process the sending failure and disconnect from node
|
||||
|
@ -1586,26 +1588,26 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
CountDownLatch latch = new CountDownLatch(1);
|
||||
serviceA.sendRequest(connection, "action", new TestRequest(), TransportRequestOptions.EMPTY,
|
||||
new TransportResponseHandler<TestResponse>() {
|
||||
@Override
|
||||
public TestResponse newInstance() {
|
||||
return new TestResponse();
|
||||
}
|
||||
@Override
|
||||
public TestResponse newInstance() {
|
||||
return new TestResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(TestResponse response) {
|
||||
latch.countDown();
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(TestResponse response) {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
latch.countDown();
|
||||
}
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
});
|
||||
|
||||
assertFalse(requestProcessed.get());
|
||||
|
||||
|
@ -1859,14 +1861,20 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
|
||||
public void testRegisterHandlerTwice() {
|
||||
serviceB.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
|
||||
(request, message) -> {throw new AssertionError("boom");});
|
||||
(request, message) -> {
|
||||
throw new AssertionError("boom");
|
||||
});
|
||||
expectThrows(IllegalArgumentException.class, () ->
|
||||
serviceB.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
|
||||
(request, message) -> {throw new AssertionError("boom");})
|
||||
(request, message) -> {
|
||||
throw new AssertionError("boom");
|
||||
})
|
||||
);
|
||||
|
||||
serviceA.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
|
||||
(request, message) -> {throw new AssertionError("boom");});
|
||||
(request, message) -> {
|
||||
throw new AssertionError("boom");
|
||||
});
|
||||
}
|
||||
|
||||
public void testTimeoutPerConnection() throws IOException {
|
||||
|
@ -1914,11 +1922,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
public void testHandshakeWithIncompatVersion() {
|
||||
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||
Version version = Version.fromString("2.0.0");
|
||||
try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
||||
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()),
|
||||
Version.fromString("2.0.0"))) {
|
||||
transport.transportServiceAdapter(serviceA.new Adapter());
|
||||
transport.start();
|
||||
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version);
|
||||
MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
DiscoveryNode node =
|
||||
new DiscoveryNode("TS_TPC", "TS_TPC", transport.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
|
||||
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
|
||||
|
@ -1937,9 +1946,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||
Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT);
|
||||
try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
||||
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()),version)) {
|
||||
transport.transportServiceAdapter(serviceA.new Adapter());
|
||||
transport.start();
|
||||
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version);
|
||||
MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
DiscoveryNode node =
|
||||
new DiscoveryNode("TS_TPC", "TS_TPC", transport.boundAddress().publishAddress(), emptyMap(), emptySet(),
|
||||
Version.fromString("2.0.0"));
|
||||
|
@ -1956,24 +1966,26 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public void testTcpHandshake() throws IOException, InterruptedException {
|
||||
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
|
||||
TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport();
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||
|
||||
try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
||||
MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
||||
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList())) {
|
||||
@Override
|
||||
protected String handleRequest(MockChannel mockChannel, String profileName, StreamInput stream, long requestId,
|
||||
int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status)
|
||||
throws IOException {
|
||||
return super.handleRequest(mockChannel, profileName, stream, requestId, messageLengthBytes, version, remoteAddress,
|
||||
(byte)(status & ~(1<<3))); // we flip the isHandshake bit back and act like the handler is not found
|
||||
(byte) (status & ~(1 << 3))); // we flip the isHandshake bit back and act like the handler is not found
|
||||
}
|
||||
}) {
|
||||
transport.transportServiceAdapter(serviceA.new Adapter());
|
||||
transport.start();
|
||||
};
|
||||
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, Version.CURRENT, threadPool,
|
||||
null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
// this acts like a node that doesn't have support for handshakes
|
||||
DiscoveryNode node =
|
||||
new DiscoveryNode("TS_TPC", "TS_TPC", transport.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
|
||||
|
@ -1986,7 +1998,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
TcpTransport.NodeChannels connection = originalTransport.openConnection(
|
||||
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),
|
||||
null
|
||||
) ) {
|
||||
)) {
|
||||
Version version = originalTransport.executeHandshake(connection.getNode(),
|
||||
connection.channel(TransportRequestOptions.Type.PING), TimeValue.timeValueSeconds(10));
|
||||
assertEquals(version, Version.CURRENT);
|
||||
|
@ -2105,8 +2117,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
}
|
||||
};
|
||||
|
||||
serviceB.sendRequest(nodeA, "action", new TestRequest(randomFrom("fail", "pass")), transportResponseHandler);
|
||||
serviceA.sendRequest(nodeA, "action", new TestRequest(randomFrom("fail", "pass")), transportResponseHandler);
|
||||
serviceB.sendRequest(nodeA, "action", new TestRequest(randomFrom("fail", "pass")), transportResponseHandler);
|
||||
serviceA.sendRequest(nodeA, "action", new TestRequest(randomFrom("fail", "pass")), transportResponseHandler);
|
||||
latch.await();
|
||||
}
|
||||
|
||||
|
@ -2303,22 +2315,22 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
TransportRequestOptions.Type.STATE);
|
||||
try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) {
|
||||
assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here
|
||||
TransportStats transportStats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake
|
||||
assertEquals(1, transportStats.getRxCount());
|
||||
assertEquals(1, transportStats.getTxCount());
|
||||
assertEquals(25, transportStats.getRxSize().getBytes());
|
||||
assertEquals(45, transportStats.getTxSize().getBytes());
|
||||
});
|
||||
TransportStats transportStats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake
|
||||
assertEquals(1, transportStats.getRxCount());
|
||||
assertEquals(1, transportStats.getTxCount());
|
||||
assertEquals(25, transportStats.getRxSize().getBytes());
|
||||
assertEquals(45, transportStats.getTxSize().getBytes());
|
||||
});
|
||||
serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY,
|
||||
transportResponseHandler);
|
||||
receivedLatch.await();
|
||||
assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here
|
||||
TransportStats transportStats = serviceC.transport.getStats(); // request has ben send
|
||||
assertEquals(1, transportStats.getRxCount());
|
||||
assertEquals(2, transportStats.getTxCount());
|
||||
assertEquals(25, transportStats.getRxSize().getBytes());
|
||||
assertEquals(91, transportStats.getTxSize().getBytes());
|
||||
});
|
||||
TransportStats transportStats = serviceC.transport.getStats(); // request has ben send
|
||||
assertEquals(1, transportStats.getRxCount());
|
||||
assertEquals(2, transportStats.getTxCount());
|
||||
assertEquals(25, transportStats.getRxSize().getBytes());
|
||||
assertEquals(91, transportStats.getTxSize().getBytes());
|
||||
});
|
||||
sendResponseLatch.countDown();
|
||||
responseLatch.await();
|
||||
stats = serviceC.transport.getStats(); // response has been received
|
||||
|
@ -2398,22 +2410,22 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
TransportRequestOptions.Type.STATE);
|
||||
try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) {
|
||||
assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here
|
||||
TransportStats transportStats = serviceC.transport.getStats(); // request has ben send
|
||||
assertEquals(1, transportStats.getRxCount());
|
||||
assertEquals(1, transportStats.getTxCount());
|
||||
assertEquals(25, transportStats.getRxSize().getBytes());
|
||||
assertEquals(45, transportStats.getTxSize().getBytes());
|
||||
});
|
||||
TransportStats transportStats = serviceC.transport.getStats(); // request has ben send
|
||||
assertEquals(1, transportStats.getRxCount());
|
||||
assertEquals(1, transportStats.getTxCount());
|
||||
assertEquals(25, transportStats.getRxSize().getBytes());
|
||||
assertEquals(45, transportStats.getTxSize().getBytes());
|
||||
});
|
||||
serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY,
|
||||
transportResponseHandler);
|
||||
receivedLatch.await();
|
||||
assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here
|
||||
TransportStats transportStats = serviceC.transport.getStats(); // request has ben send
|
||||
assertEquals(1, transportStats.getRxCount());
|
||||
assertEquals(2, transportStats.getTxCount());
|
||||
assertEquals(25, transportStats.getRxSize().getBytes());
|
||||
assertEquals(91, transportStats.getTxSize().getBytes());
|
||||
});
|
||||
TransportStats transportStats = serviceC.transport.getStats(); // request has ben send
|
||||
assertEquals(1, transportStats.getRxCount());
|
||||
assertEquals(2, transportStats.getTxCount());
|
||||
assertEquals(25, transportStats.getRxSize().getBytes());
|
||||
assertEquals(91, transportStats.getTxSize().getBytes());
|
||||
});
|
||||
sendResponseLatch.countDown();
|
||||
responseLatch.await();
|
||||
stats = serviceC.transport.getStats(); // exception response has been received
|
||||
|
|
Loading…
Reference in New Issue