Transport: added a simple request tracer, logging incoming and outgoing requests

The request tracer logs in TRACE level under the `transport.tracer` log and is dynamically configurable with include and exclude arrays to filter out unneeded info. By default all requests are logged with the exception of fault detection pings (fired every second).

add the notion of tracers in the MockTransportService for testing purposes

Closes #9286
This commit is contained in:
Boaz Leskes 2015-01-08 13:31:28 +01:00
parent b7e49f11ed
commit e9dbfa9ee6
14 changed files with 666 additions and 173 deletions

View File

@ -83,3 +83,34 @@ The following parameters can be configured like that
This is a handy transport to use when running integration tests within
the JVM. It is automatically enabled when using
`NodeBuilder#local(true)`.
[float]
coming[1.5.0]
=== Transport Tracer
The transport module has a dedicated tracer logger which, when activated, logs incoming and out going requests. The log can be dynamically activated
by settings the level of the `transport.tracer` logger to `TRACE`:
[source,js]
--------------------------------------------------
curl -XPUT localhost:9200/_cluster/settings -d '{
"transient" : {
"logger.transport.tracer" : "TRACE"
}
}'
--------------------------------------------------
You can also control which actions will be traced, using a set of include and exclude wildcard patterns. By default every request will be traced
except for fault detection pings:
[source,js]
--------------------------------------------------
curl -XPUT localhost:9200/_cluster/settings -d '{
"transient" : {
"transport.tracer.include" : "*"
"transport.tracer.exclude" : "internal:discovery/zen/fd*"
}
}'
--------------------------------------------------

View File

@ -23,10 +23,16 @@ import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
@ -34,9 +40,10 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
@ -74,7 +81,18 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
});
private final TransportService.Adapter adapter = new Adapter();
private final TransportService.Adapter adapter;
// tracer log
public static final String SETTING_TRACE_LOG_INCLUDE = "transport.tracer.include";
public static final String SETTING_TRACE_LOG_EXCLUDE = "transport.tracer.exclude";
private final ESLogger tracerLog;
volatile String[] tracerLogInclude;
volatile String[] tracelLogExclude;
private final ApplySettings settingsListener = new ApplySettings();
public TransportService(Transport transport, ThreadPool threadPool) {
this(EMPTY_SETTINGS, transport, threadPool);
@ -85,6 +103,46 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
super(settings);
this.transport = transport;
this.threadPool = threadPool;
this.tracerLogInclude = settings.getAsArray(SETTING_TRACE_LOG_INCLUDE, Strings.EMPTY_ARRAY, true);
this.tracelLogExclude = settings.getAsArray(SETTING_TRACE_LOG_EXCLUDE, new String[]{"internal:discovery/zen/fd*"}, true);
tracerLog = Loggers.getLogger(logger, ".tracer");
adapter = createAdapter();
}
protected Adapter createAdapter() {
return new Adapter();
}
// These need to be optional as they don't exist in the context of a transport client
@Inject(optional = true)
public void setDynamicSettings(NodeSettingsService nodeSettingsService, @ClusterDynamicSettings DynamicSettings dynamicSettings) {
dynamicSettings.addDynamicSettings(SETTING_TRACE_LOG_INCLUDE, SETTING_TRACE_LOG_INCLUDE + ".*");
dynamicSettings.addDynamicSettings(SETTING_TRACE_LOG_EXCLUDE, SETTING_TRACE_LOG_EXCLUDE + ".*");
nodeSettingsService.addListener(settingsListener);
}
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
String[] newTracerLogInclude = settings.getAsArray(SETTING_TRACE_LOG_INCLUDE, TransportService.this.tracerLogInclude, true);
String[] newTracerLogExclude = settings.getAsArray(SETTING_TRACE_LOG_EXCLUDE, TransportService.this.tracelLogExclude, true);
if (newTracerLogInclude == TransportService.this.tracerLogInclude && newTracerLogExclude == TransportService.this.tracelLogExclude) {
return;
}
if (Arrays.equals(newTracerLogInclude, TransportService.this.tracerLogInclude) &&
Arrays.equals(newTracerLogExclude, TransportService.this.tracelLogExclude)) {
return;
}
TransportService.this.tracerLogInclude = newTracerLogInclude;
TransportService.this.tracelLogExclude = newTracerLogExclude;
logger.info("tracer log updated to use include: {}, exclude: {}", newTracerLogInclude, newTracerLogExclude);
}
}
// used for testing
public void applySettings(Settings settings) {
settingsListener.onRefreshSettings(settings);
}
@Override
@ -234,6 +292,18 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
}
private boolean shouldTraceAction(String action) {
if (tracerLogInclude.length > 0) {
if (Regex.simpleMatch(tracerLogInclude, action) == false) {
return false;
}
}
if (tracelLogExclude.length > 0) {
return !Regex.simpleMatch(tracelLogExclude, action);
}
return true;
}
private long newRequestId() {
return requestIds.getAndIncrement();
}
@ -262,7 +332,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
return serverHandlers.get(action);
}
class Adapter implements TransportServiceAdapter {
protected class Adapter implements TransportServiceAdapter {
final MeanMetric rxMetric = new MeanMetric();
final MeanMetric txMetric = new MeanMetric();
@ -277,6 +347,68 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
txMetric.inc(size);
}
@Override
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) {
if (traceEnabled() && shouldTraceAction(action)) {
traceRequestSent(node, requestId, action, options);
}
}
protected boolean traceEnabled() {
return tracerLog.isTraceEnabled();
}
@Override
public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions options) {
if (traceEnabled() && shouldTraceAction(action)) {
traceResponseSent(requestId, action);
}
}
@Override
public void onResponseSent(long requestId, String action, Throwable t) {
if (traceEnabled() && shouldTraceAction(action)) {
traceResponseSent(requestId, action, t);
}
}
protected void traceResponseSent(long requestId, String action, Throwable t) {
tracerLog.trace("[{}][{}] sent error response (error: [{}])", requestId, action, t.getMessage());
}
@Override
public void onResponseReceived(long requestId) {
if (traceEnabled()) {
// try to resolve the request
DiscoveryNode sourceNode = null;
String action = null;
RequestHolder holder = clientHandlers.get(requestId);
if (holder != null) {
action = holder.action();
sourceNode = holder.node();
} else {
// lets see if its in the timeout holder
TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.get(requestId);
if (timeoutInfoHolder != null) {
action = holder.action();
sourceNode = holder.node();
}
}
if (action == null) {
traceUnresolvedResponse(requestId);
} else if (shouldTraceAction(action)) {
traceReceivedResponse(requestId, sourceNode, action);
}
}
}
@Override
public void onRequestReceived(long requestId, String action) {
if (traceEnabled() && shouldTraceAction(action)) {
traceReceivedRequest(requestId, action);
}
}
@Override
public TransportRequestHandler handler(String action) {
return serverHandlers.get(action);
@ -343,6 +475,27 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
logger.debug("Rejected execution on NodeDisconnected", ex);
}
}
protected void traceReceivedRequest(long requestId, String action) {
tracerLog.trace("[{}][{}] received request", requestId, action);
}
protected void traceResponseSent(long requestId, String action) {
tracerLog.trace("[{}][{}] sent response", requestId, action);
}
protected void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
tracerLog.trace("[{}][{}] received response from [{}]", requestId, action, sourceNode);
}
protected void traceUnresolvedResponse(long requestId) {
tracerLog.trace("[{}] received response but can't resolve it to a request", requestId);
}
protected void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout());
}
}
class TimeoutHandler implements Runnable {

View File

@ -30,6 +30,27 @@ public interface TransportServiceAdapter {
void sent(long size);
/** called by the {@link Transport} implementation once a request has been sent */
void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options);
/** called by the {@link Transport) implementation once a response was sent to calling node */
void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions options);
/** called by the {@link Transport) implementation after an exception was sent as a response to an incoming request */
void onResponseSent(long requestId, String action, Throwable t);
/**
* called by the {@link Transport) implementation when a response or an exception has been recieved for a previously
* sent request (before any processing or deserialization was done
*/
void onResponseReceived(long requestId);
/**
* called by the {@link Transport) implementation when an incoming request arrives but before
* any parsing of it has happened (with the exception of the requestId and action)
*/
void onRequestReceived(long requestId, String action);
TransportRequestHandler handler(String action);
TransportResponseHandler remove(long requestId);

View File

@ -207,7 +207,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
final byte[] data = stream.bytes().toBytes();
transportServiceAdapter.sent(data.length);
transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
targetTransport.workers().execute(new Runnable() {
@Override
public void run() {
@ -235,6 +235,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
if (isRequest) {
handleRequest(stream, requestId, sourceTransport, version);
} else {
// notify with response before we process it and before we remove information about it.
transportServiceAdapter.onResponseReceived(requestId);
final TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
@ -259,7 +261,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
private void handleRequest(StreamInput stream, long requestId, LocalTransport sourceTransport, Version version) throws Exception {
final String action = stream.readString();
final LocalTransportChannel transportChannel = new LocalTransportChannel(this, sourceTransport, action, requestId, version);
transportServiceAdapter.onRequestReceived(requestId, action);
final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action, requestId, version);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action);
if (handler == null) {

View File

@ -35,14 +35,16 @@ import java.io.NotSerializableException;
public class LocalTransportChannel implements TransportChannel {
private final LocalTransport sourceTransport;
private final TransportServiceAdapter sourceTransportServiceAdapter;
// the transport we will *send to*
private final LocalTransport targetTransport;
private final String action;
private final long requestId;
private final Version version;
public LocalTransportChannel(LocalTransport sourceTransport, LocalTransport targetTransport, String action, long requestId, Version version) {
public LocalTransportChannel(LocalTransport sourceTransport, TransportServiceAdapter sourceTransportServiceAdapter, LocalTransport targetTransport, String action, long requestId, Version version) {
this.sourceTransport = sourceTransport;
this.sourceTransportServiceAdapter = sourceTransportServiceAdapter;
this.targetTransport = targetTransport;
this.action = action;
this.requestId = requestId;
@ -75,6 +77,7 @@ public class LocalTransportChannel implements TransportChannel {
targetTransport.messageReceived(data, action, sourceTransport, version, null);
}
});
sourceTransportServiceAdapter.onResponseSent(requestId, action, response, options);
}
}
@ -102,6 +105,7 @@ public class LocalTransportChannel implements TransportChannel {
targetTransport.messageReceived(data, action, sourceTransport, version, null);
}
});
sourceTransportServiceAdapter.onResponseSent(requestId, action, error);
}
private void writeResponseExceptionHeader(BytesStreamOutput stream) throws IOException {

View File

@ -120,6 +120,8 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
buffer.readerIndex(expectedIndexReader);
}
} else {
// notify with response before we process it and before we remove information about it.
transportServiceAdapter.onResponseReceived(requestId);
TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
@ -204,8 +206,8 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readString();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version, profileName);
transportServiceAdapter.onRequestReceived(requestId, action);
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action);
if (handler == null) {

View File

@ -679,6 +679,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
future.addListener(listener);
addedReleaseListener = true;
transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
} finally {
if (!addedReleaseListener) {
Releasables.close(bStream.bytes());

View File

@ -44,13 +44,15 @@ import java.io.NotSerializableException;
public class NettyTransportChannel implements TransportChannel {
private final NettyTransport transport;
private final TransportServiceAdapter transportServiceAdapter;
private final Version version;
private final String action;
private final Channel channel;
private final long requestId;
private final String profileName;
public NettyTransportChannel(NettyTransport transport, String action, Channel channel, long requestId, Version version, String profileName) {
public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter transportServiceAdapter, String action, Channel channel, long requestId, Version version, String profileName) {
this.transportServiceAdapter = transportServiceAdapter;
this.version = version;
this.transport = transport;
this.action = action;
@ -102,6 +104,7 @@ public class NettyTransportChannel implements TransportChannel {
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
future.addListener(listener);
addedReleaseListener = true;
transportServiceAdapter.onResponseSent(requestId, action, response, options);
} finally {
if (!addedReleaseListener) {
Releasables.close(bStream.bytes());
@ -135,5 +138,6 @@ public class NettyTransportChannel implements TransportChannel {
ChannelBuffer buffer = bytes.toChannelBuffer();
NettyHeader.writeHeader(buffer, requestId, status, version);
channel.write(buffer);
transportServiceAdapter.onResponseSent(requestId, action, error);
}
}

View File

@ -92,11 +92,14 @@ import org.elasticsearch.action.termvectors.TermVectorsRequest;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.threadpool.ThreadPool;
@ -904,7 +907,8 @@ public class IndicesRequestTests extends ElasticsearchIntegrationTest {
private final Map<String, List<TransportRequest>> requests = new HashMap<>();
@Inject
public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool,
NodeSettingsService nodeSettingsService, @ClusterDynamicSettings DynamicSettings dynamicSettings) {
super(settings, transport, threadPool);
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.benchmark.transport;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@ -29,6 +30,7 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.netty.NettyTransport;
@ -52,10 +54,17 @@ public class BenchmarkNettyLargeMessages {
.build();
NetworkService networkService = new NetworkService(settings);
NodeSettingsService settingsService = new NodeSettingsService(settings);
DynamicSettings dynamicSettings = new DynamicSettings();
final ThreadPool threadPool = new ThreadPool("BenchmarkNettyLargeMessages");
final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT), threadPool).start();
final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT), threadPool).start();
final TransportService transportServiceServer = new TransportService(
new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT), threadPool
).start();
final TransportService transportServiceClient = new TransportService(
new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT), threadPool
).start();
final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300), Version.CURRENT);
// final DiscoveryNode smallNode = new DiscoveryNode("small", new InetSocketTransportAddress("localhost", 9300));

View File

@ -902,7 +902,9 @@ public final class InternalTestCluster extends TestCluster {
for (NodeAndClient nodeAndClient : nodes.values()) {
TransportService transportService = nodeAndClient.node.injector().getInstance(TransportService.class);
if (transportService instanceof MockTransportService) {
((MockTransportService) transportService).clearAllRules();
final MockTransportService mockTransportService = (MockTransportService) transportService;
mockTransportService.clearAllRules();
mockTransportService.clearTracers();
}
}
randomlyResetClients();

View File

@ -37,11 +37,9 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* A mock transport service that allows to simulate different network topology failures.
@ -410,4 +408,91 @@ public class MockTransportService extends TransportService {
return transport.profileBoundAddresses();
}
}
List<Tracer> activeTracers = new CopyOnWriteArrayList<>();
public static class Tracer {
public void receivedRequest(long requestId, String action) {
}
public void responseSent(long requestId, String action) {
}
public void responseSent(long requestId, String action, Throwable t) {
}
public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
}
public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
}
}
public void addTracer(Tracer tracer) {
activeTracers.add(tracer);
}
public boolean removeTracer(Tracer tracer) {
return activeTracers.remove(tracer);
}
public void clearTracers() {
activeTracers.clear();
}
@Override
protected Adapter createAdapter() {
return new MockAdapter();
}
class MockAdapter extends Adapter {
@Override
protected boolean traceEnabled() {
return super.traceEnabled() || activeTracers.isEmpty() == false;
}
@Override
protected void traceReceivedRequest(long requestId, String action) {
super.traceReceivedRequest(requestId, action);
for (Tracer tracer : activeTracers) {
tracer.receivedRequest(requestId, action);
}
}
@Override
protected void traceResponseSent(long requestId, String action) {
super.traceResponseSent(requestId, action);
for (Tracer tracer : activeTracers) {
tracer.responseSent(requestId, action);
}
}
@Override
protected void traceResponseSent(long requestId, String action, Throwable t) {
super.traceResponseSent(requestId, action, t);
for (Tracer tracer : activeTracers) {
tracer.responseSent(requestId, action, t);
}
}
@Override
protected void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
super.traceReceivedResponse(requestId, sourceNode, action);
for (Tracer tracer : activeTracers) {
tracer.receivedResponse(requestId, sourceNode, action);
}
}
@Override
protected void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
super.traceRequestSent(node, requestId, action, options);
for (Tracer tracer : activeTracers) {
tracer.requestSent(node, requestId, action, options);
}
}
}
}

View File

@ -37,6 +37,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -65,9 +66,15 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
public void setUp() throws Exception {
super.setUp();
threadPool = new ThreadPool(getClass().getName());
serviceA = build(ImmutableSettings.builder().put("name", "TS_A").build(), version0);
serviceA = build(
ImmutableSettings.builder().put("name", "TS_A", TransportService.SETTING_TRACE_LOG_INCLUDE, "", TransportService.SETTING_TRACE_LOG_EXCLUDE, "NOTHING").build(),
version0
);
nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), ImmutableMap.<String, String>of(), version0);
serviceB = build(ImmutableSettings.builder().put("name", "TS_B").build(), version1);
serviceB = build(
ImmutableSettings.builder().put("name", "TS_B", TransportService.SETTING_TRACE_LOG_INCLUDE, "", TransportService.SETTING_TRACE_LOG_EXCLUDE, "NOTHING").build(),
version1
);
nodeB = new DiscoveryNode("TS_B", "TS_B", serviceB.boundAddress().publishAddress(), ImmutableMap.<String, String>of(), version1);
// wait till all nodes are properly connected and the event has been sent, so tests in this class
@ -133,27 +140,27 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
new StringMessageRequest("moshe"), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public void handleResponse(StringMessageResponse response) {
assertThat("hello moshe", equalTo(response.message));
}
@Override
public void handleResponse(StringMessageResponse response) {
assertThat("hello moshe", equalTo(response.message));
}
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
});
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
});
try {
StringMessageResponse message = res.get();
@ -164,27 +171,27 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
res = serviceB.submitRequest(nodeA, "sayHello",
new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public void handleResponse(StringMessageResponse response) {
assertThat("hello moshe", equalTo(response.message));
}
@Override
public void handleResponse(StringMessageResponse response) {
assertThat("hello moshe", equalTo(response.message));
}
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
});
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
});
try {
StringMessageResponse message = res.get();
@ -222,26 +229,26 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
TransportFuture<TransportResponse.Empty> res = serviceB.submitRequest(nodeA, "sayHello",
TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<TransportResponse.Empty>() {
@Override
public TransportResponse.Empty newInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
public TransportResponse.Empty newInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public void handleResponse(TransportResponse.Empty response) {
}
@Override
public void handleResponse(TransportResponse.Empty response) {
}
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
});
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
});
try {
TransportResponse.Empty message = res.get();
@ -280,27 +287,27 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public void handleResponse(StringMessageResponse response) {
assertThat("hello moshe", equalTo(response.message));
}
@Override
public void handleResponse(StringMessageResponse response) {
assertThat("hello moshe", equalTo(response.message));
}
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
});
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
});
try {
StringMessageResponse message = res.get();
@ -334,30 +341,30 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloException",
new StringMessageRequest("moshe"), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public void handleResponse(StringMessageResponse response) {
assertThat("got response instead of exception", false, equalTo(true));
}
@Override
public void handleResponse(StringMessageResponse response) {
fail("got response instead of exception");
}
@Override
public void handleException(TransportException exp) {
assertThat("bad message !!!", equalTo(exp.getCause().getMessage()));
}
});
@Override
public void handleException(TransportException exp) {
assertThat("bad message !!!", equalTo(exp.getCause().getMessage()));
}
});
try {
res.txGet();
assertThat("exception should be thrown", false, equalTo(true));
fail("exception should be thrown");
} catch (Exception e) {
assertThat("bad message !!!", equalTo(e.getCause().getMessage()));
}
@ -451,30 +458,30 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloTimeoutNoResponse",
new StringMessageRequest("moshe"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public void handleResponse(StringMessageResponse response) {
assertThat("got response instead of exception", false, equalTo(true));
}
@Override
public void handleResponse(StringMessageResponse response) {
fail("got response instead of exception");
}
@Override
public void handleException(TransportException exp) {
assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class));
}
});
@Override
public void handleException(TransportException exp) {
assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class));
}
});
try {
StringMessageResponse message = res.txGet();
assertThat("exception should be thrown", false, equalTo(true));
fail("exception should be thrown");
} catch (Exception e) {
assertThat(e, instanceOf(ReceiveTimeoutTransportException.class));
}
@ -514,32 +521,32 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
final CountDownLatch latch = new CountDownLatch(1);
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse",
new StringMessageRequest("300ms"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public void handleResponse(StringMessageResponse response) {
latch.countDown();
assertThat("got response instead of exception", false, equalTo(true));
}
@Override
public void handleResponse(StringMessageResponse response) {
latch.countDown();
fail("got response instead of exception");
}
@Override
public void handleException(TransportException exp) {
latch.countDown();
assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class));
}
});
@Override
public void handleException(TransportException exp) {
latch.countDown();
assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class));
}
});
try {
StringMessageResponse message = res.txGet();
assertThat("exception should be thrown", false, equalTo(true));
fail("exception should be thrown");
} catch (Exception e) {
assertThat(e, instanceOf(ReceiveTimeoutTransportException.class));
}
@ -550,27 +557,27 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
// now, try and send another request, this times, with a short timeout
res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse",
new StringMessageRequest(counter + "ms"), options().withTimeout(3000), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public void handleResponse(StringMessageResponse response) {
assertThat("hello " + counter + "ms", equalTo(response.message));
}
@Override
public void handleResponse(StringMessageResponse response) {
assertThat("hello " + counter + "ms", equalTo(response.message));
}
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response for " + counter + ": " + exp.getDetailedMessage(), false, equalTo(true));
}
});
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
fail("got exception instead of a response for " + counter + ": " + exp.getDetailedMessage());
}
});
StringMessageResponse message = res.txGet();
assertThat(message.message, equalTo("hello " + counter + "ms"));
@ -579,6 +586,173 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
serviceA.removeHandler("sayHelloTimeoutDelayedResponse");
}
@Test
public void testTracerLog() throws InterruptedException {
TransportRequestHandler handler = new BaseTransportRequestHandler<StringMessageRequest>() {
@Override
public StringMessageRequest newInstance() {
return new StringMessageRequest("");
}
@Override
public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(new StringMessageResponse(""));
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
};
TransportRequestHandler handlerWithError = new BaseTransportRequestHandler<StringMessageRequest>() {
@Override
public StringMessageRequest newInstance() {
return new StringMessageRequest("");
}
@Override
public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(new RuntimeException(""));
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
};
final Semaphore requestCompleted = new Semaphore(0);
TransportResponseHandler noopResponseHandler = new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public void handleResponse(StringMessageResponse response) {
requestCompleted.release();
}
@Override
public void handleException(TransportException exp) {
requestCompleted.release();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
};
serviceA.registerHandler("test", handler);
serviceA.registerHandler("testError", handlerWithError);
serviceB.registerHandler("test", handler);
serviceB.registerHandler("testError", handlerWithError);
Tracer tracer = new Tracer();
serviceA.addTracer(tracer);
serviceB.addTracer(tracer);
serviceA.sendRequest(nodeB, "test", new StringMessageRequest(""), noopResponseHandler);
requestCompleted.acquire();
assertThat("didn't see request sent", tracer.sawRequestSent, equalTo(true));
assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true));
assertThat("didn't see response sent", tracer.sawResponseSent, equalTo(true));
assertThat("didn't see response received", tracer.sawResponseReceived, equalTo(true));
assertThat("saw error sent", tracer.sawErrorSent, equalTo(false));
tracer.reset();
serviceA.sendRequest(nodeB, "testError", new StringMessageRequest(""), noopResponseHandler);
requestCompleted.acquire();
assertThat("didn't see request sent", tracer.sawRequestSent, equalTo(true));
assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true));
assertThat("saw response sent", tracer.sawResponseSent, equalTo(false));
assertThat("didn't see response received", tracer.sawResponseReceived, equalTo(true));
assertThat("didn't see error sent", tracer.sawErrorSent, equalTo(true));
String includeSettings;
String excludeSettings;
if (randomBoolean()) {
// sometimes leave include empty (default)
includeSettings = randomBoolean() ? "*" : "";
excludeSettings = "*Error";
} else {
includeSettings = "test";
excludeSettings = "DOESN'T_MATCH";
}
serviceA.applySettings(ImmutableSettings.builder()
.put(TransportService.SETTING_TRACE_LOG_INCLUDE, includeSettings, TransportService.SETTING_TRACE_LOG_EXCLUDE, excludeSettings)
.build());
tracer.reset();
serviceA.sendRequest(nodeB, "test", new StringMessageRequest(""), noopResponseHandler);
requestCompleted.acquire();
assertThat("didn't see request sent", tracer.sawRequestSent, equalTo(true));
assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true));
assertThat("didn't see response sent", tracer.sawResponseSent, equalTo(true));
assertThat("didn't see response received", tracer.sawResponseReceived, equalTo(true));
assertThat("saw error sent", tracer.sawErrorSent, equalTo(false));
tracer.reset();
serviceA.sendRequest(nodeB, "testError", new StringMessageRequest(""), noopResponseHandler);
requestCompleted.acquire();
assertThat("saw request sent", tracer.sawRequestSent, equalTo(false));
assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true));
assertThat("saw response sent", tracer.sawResponseSent, equalTo(false));
assertThat("saw response received", tracer.sawResponseReceived, equalTo(false));
assertThat("didn't see error sent", tracer.sawErrorSent, equalTo(true));
}
private static class Tracer extends MockTransportService.Tracer {
public volatile boolean sawRequestSent;
public volatile boolean sawRequestReceived;
public volatile boolean sawResponseSent;
public volatile boolean sawErrorSent;
public volatile boolean sawResponseReceived;
@Override
public void receivedRequest(long requestId, String action) {
super.receivedRequest(requestId, action);
sawRequestReceived = true;
}
@Override
public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
super.requestSent(node, requestId, action, options);
sawRequestSent = true;
}
@Override
public void responseSent(long requestId, String action) {
super.responseSent(requestId, action);
sawResponseSent = true;
}
@Override
public void responseSent(long requestId, String action, Throwable t) {
super.responseSent(requestId, action, t);
sawErrorSent = true;
}
@Override
public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
super.receivedResponse(requestId, sourceNode, action);
sawResponseReceived = true;
}
public void reset() {
sawRequestSent = false;
sawRequestReceived = false;
sawResponseSent = false;
sawErrorSent = false;
sawResponseReceived = false;
}
}
static class StringMessageRequest extends TransportRequest {
private String message;
@ -951,7 +1125,7 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
@Override
public void handleResponse(StringMessageResponse response) {
assertThat("got response instead of exception", false, equalTo(true));
fail("got response instead of exception");
}
@Override
@ -962,21 +1136,21 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
try {
res.txGet();
assertThat("exception should be thrown", false, equalTo(true));
fail("exception should be thrown");
} catch (Exception e) {
assertThat(e.getCause().getMessage(), endsWith("DISCONNECT: simulated"));
}
try {
serviceB.connectToNode(nodeA);
assertThat("exception should be thrown", false, equalTo(true));
fail("exception should be thrown");
} catch (ConnectTransportException e) {
// all is well
}
try {
serviceB.connectToNodeLight(nodeA);
assertThat("exception should be thrown", false, equalTo(true));
fail("exception should be thrown");
} catch (ConnectTransportException e) {
// all is well
}
@ -1020,7 +1194,7 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
@Override
public void handleResponse(StringMessageResponse response) {
assertThat("got response instead of exception", false, equalTo(true));
fail("got response instead of exception");
}
@Override
@ -1031,21 +1205,21 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
try {
res.txGet();
assertThat("exception should be thrown", false, equalTo(true));
fail("exception should be thrown");
} catch (Exception e) {
assertThat(e, instanceOf(ReceiveTimeoutTransportException.class));
}
try {
serviceB.connectToNode(nodeA);
assertThat("exception should be thrown", false, equalTo(true));
fail("exception should be thrown");
} catch (ConnectTransportException e) {
// all is well
}
try {
serviceB.connectToNodeLight(nodeA);
assertThat("exception should be thrown", false, equalTo(true));
fail("exception should be thrown");
} catch (ConnectTransportException e) {
// all is well
}

View File

@ -113,7 +113,7 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest {
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readString();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version, name);
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, name);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action);
if (handler == null) {