Introduce `TransportLogger` for common logging (#32725)

Historically we have had a ESLoggingHandler in the netty module that
logs low-level connection operations. This class just extends the netty
logging handler with some (broken) message deserialization. This commit
fixes this message serialization and moves the class to server.

This new logger logs inbound and outbound messages. Eventually, we
should move other event logging to this class (connect, close, flush).
That way we will have consistent logging regards of which transport is
loaded.

Resolves #27306 on master. Older branches will need a different fix.
This commit is contained in:
Tim Brooks 2018-09-05 16:12:37 -06:00 committed by GitHub
parent 88c178dca6
commit b697f485bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 376 additions and 139 deletions

View File

@ -19,21 +19,9 @@
package org.elasticsearch.transport.netty4;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.elasticsearch.Version;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.transport.TcpHeader;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportStatus;
import java.io.IOException;
final class ESLoggingHandler extends LoggingHandler {
@ -42,92 +30,8 @@ final class ESLoggingHandler extends LoggingHandler {
}
@Override
protected String format(final ChannelHandlerContext ctx, final String eventName, final Object arg) {
if (arg instanceof ByteBuf) {
try {
return format(ctx, eventName, (ByteBuf) arg);
} catch (final Exception e) {
// we really do not want to allow a bug in the formatting handling to escape
logger.trace("an exception occurred formatting a trace message", e);
// we are going to let this be formatted via the default formatting
return super.format(ctx, eventName, arg);
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// We do not want to log read complete events because we log inbound messages in the TcpTransport.
ctx.fireChannelReadComplete();
}
} else {
return super.format(ctx, eventName, arg);
}
}
private static final int MESSAGE_LENGTH_OFFSET = TcpHeader.MARKER_BYTES_SIZE;
private static final int REQUEST_ID_OFFSET = MESSAGE_LENGTH_OFFSET + TcpHeader.MESSAGE_LENGTH_SIZE;
private static final int STATUS_OFFSET = REQUEST_ID_OFFSET + TcpHeader.REQUEST_ID_SIZE;
private static final int VERSION_ID_OFFSET = STATUS_OFFSET + TcpHeader.STATUS_SIZE;
private static final int ACTION_OFFSET = VERSION_ID_OFFSET + TcpHeader.VERSION_ID_SIZE;
private String format(final ChannelHandlerContext ctx, final String eventName, final ByteBuf arg) throws IOException {
final int readableBytes = arg.readableBytes();
if (readableBytes == 0) {
return super.format(ctx, eventName, arg);
} else if (readableBytes >= 2) {
final StringBuilder sb = new StringBuilder();
sb.append(ctx.channel().toString());
final int offset = arg.readerIndex();
// this might be an ES message, check the header
if (arg.getByte(offset) == (byte) 'E' && arg.getByte(offset + 1) == (byte) 'S') {
if (readableBytes == TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE) {
final int length = arg.getInt(offset + MESSAGE_LENGTH_OFFSET);
if (length == TcpTransport.PING_DATA_SIZE) {
sb.append(" [ping]").append(' ').append(eventName).append(": ").append(readableBytes).append('B');
return sb.toString();
}
}
else if (readableBytes >= TcpHeader.HEADER_SIZE) {
// we are going to try to decode this as an ES message
final int length = arg.getInt(offset + MESSAGE_LENGTH_OFFSET);
final long requestId = arg.getLong(offset + REQUEST_ID_OFFSET);
final byte status = arg.getByte(offset + STATUS_OFFSET);
final boolean isRequest = TransportStatus.isRequest(status);
final String type = isRequest ? "request" : "response";
final String version = Version.fromId(arg.getInt(offset + VERSION_ID_OFFSET)).toString();
sb.append(" [length: ").append(length);
sb.append(", request id: ").append(requestId);
sb.append(", type: ").append(type);
sb.append(", version: ").append(version);
if (isRequest) {
// it looks like an ES request, try to decode the action
final int remaining = readableBytes - ACTION_OFFSET;
final ByteBuf slice = arg.slice(offset + ACTION_OFFSET, remaining);
// the stream might be compressed
try (StreamInput in = in(status, slice, remaining)) {
// the first bytes in the message is the context headers
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
context.readHeaders(in);
}
// now we decode the features
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
in.readStringArray();
}
// now we can decode the action name
sb.append(", action: ").append(in.readString());
}
}
sb.append(']');
sb.append(' ').append(eventName).append(": ").append(readableBytes).append('B');
return sb.toString();
}
}
}
// we could not decode this as an ES message, use the default formatting
return super.format(ctx, eventName, arg);
}
private StreamInput in(final Byte status, final ByteBuf slice, final int remaining) throws IOException {
final ByteBufStreamInput in = new ByteBufStreamInput(slice, remaining);
if (TransportStatus.isCompress(status)) {
final Compressor compressor = CompressorFactory.compressor(Netty4Utils.toBytesReference(slice));
return compressor.streamInput(in);
} else {
return in;
}
}
}

View File

@ -26,8 +26,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Attribute;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.TcpHeader;
import org.elasticsearch.transport.Transports;
@ -46,23 +44,15 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Transports.assertTransportThread();
if (!(msg instanceof ByteBuf)) {
ctx.fireChannelRead(msg);
return;
}
assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass();
final ByteBuf buffer = (ByteBuf) msg;
final int remainingMessageSize = buffer.getInt(buffer.readerIndex() - TcpHeader.MESSAGE_LENGTH_SIZE);
final int expectedReaderIndex = buffer.readerIndex() + remainingMessageSize;
try {
Channel channel = ctx.channel();
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
// buffer, or in the cumulative buffer, which is cleaned each time so it could be bigger than the actual size
BytesReference reference = Netty4Utils.toBytesReference(buffer, remainingMessageSize);
Attribute<Netty4TcpChannel> channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY);
transport.messageReceived(reference, channelAttribute.get());
transport.inboundMessage(channelAttribute.get(), Netty4Utils.toBytesReference(buffer));
} finally {
// Set the expected position of the buffer, no matter what happened
buffer.readerIndex(expectedReaderIndex);
buffer.release();
}
}

View File

@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.TcpHeader;
import org.elasticsearch.transport.TcpTransport;
@ -36,17 +35,20 @@ final class Netty4SizeHeaderFrameDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try {
BytesReference networkBytes = Netty4Utils.toBytesReference(in);
int messageLength = TcpTransport.readMessageLength(networkBytes);
// If the message length is -1, we have not read a complete header.
if (messageLength != -1) {
boolean continueDecode = true;
while (continueDecode) {
int messageLength = TcpTransport.readMessageLength(Netty4Utils.toBytesReference(in));
if (messageLength == -1) {
continueDecode = false;
} else {
int messageLengthWithHeader = messageLength + HEADER_SIZE;
// If the message length is greater than the network bytes available, we have not read a complete frame.
if (messageLengthWithHeader <= networkBytes.length()) {
final ByteBuf message = in.skipBytes(HEADER_SIZE);
// 6 bytes would mean it is a ping. And we should ignore.
if (messageLengthWithHeader != 6) {
if (messageLengthWithHeader > in.readableBytes()) {
continueDecode = false;
} else {
final ByteBuf message = in.retainedSlice(in.readerIndex() + HEADER_SIZE, messageLength);
out.add(message);
in.readerIndex(in.readerIndex() + messageLengthWithHeader);
}
}
}

View File

@ -156,5 +156,4 @@ public class Netty4Utils {
throw closingExceptions;
}
}
}

View File

@ -26,9 +26,10 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.TransportLogger;
@ESIntegTestCase.ClusterScope(numDataNodes = 2)
@TestLogging(value = "org.elasticsearch.transport.netty4.ESLoggingHandler:trace")
@TestLogging(value = "org.elasticsearch.transport.netty4.ESLoggingHandler:trace,org.elasticsearch.transport.TransportLogger:trace")
public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {
private MockLogAppender appender;
@ -37,11 +38,13 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {
super.setUp();
appender = new MockLogAppender();
Loggers.addAppender(Loggers.getLogger(ESLoggingHandler.class), appender);
Loggers.addAppender(Loggers.getLogger(TransportLogger.class), appender);
appender.start();
}
public void tearDown() throws Exception {
Loggers.removeAppender(Loggers.getLogger(ESLoggingHandler.class), appender);
Loggers.removeAppender(Loggers.getLogger(TransportLogger.class), appender);
appender.stop();
super.tearDown();
}
@ -56,7 +59,7 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {
" WRITE: \\d+B";
final MockLogAppender.LoggingExpectation writeExpectation =
new MockLogAppender.PatternSeenEventExcpectation(
"hot threads request", ESLoggingHandler.class.getCanonicalName(), Level.TRACE, writePattern);
"hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, writePattern);
final MockLogAppender.LoggingExpectation flushExpectation =
new MockLogAppender.SeenEventExpectation("flush", ESLoggingHandler.class.getCanonicalName(), Level.TRACE, "*FLUSH*");
@ -71,7 +74,7 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {
final MockLogAppender.LoggingExpectation readExpectation =
new MockLogAppender.PatternSeenEventExcpectation(
"hot threads request", ESLoggingHandler.class.getCanonicalName(), Level.TRACE, readPattern);
"hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, readPattern);
appender.addExpectation(writeExpectation);
appender.addExpectation(flushExpectation);

View File

@ -0,0 +1,79 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.nio;
import org.apache.logging.log4j.Level;
import org.elasticsearch.NioIntegTestCase;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.TransportLogger;
@ESIntegTestCase.ClusterScope(numDataNodes = 2)
@TestLogging(value = "org.elasticsearch.transport.TransportLogger:trace")
public class NioTransportLoggingIT extends NioIntegTestCase {
private MockLogAppender appender;
public void setUp() throws Exception {
super.setUp();
appender = new MockLogAppender();
Loggers.addAppender(Loggers.getLogger(TransportLogger.class), appender);
appender.start();
}
public void tearDown() throws Exception {
Loggers.removeAppender(Loggers.getLogger(TransportLogger.class), appender);
appender.stop();
super.tearDown();
}
public void testLoggingHandler() throws IllegalAccessException {
final String writePattern =
".*\\[length: \\d+" +
", request id: \\d+" +
", type: request" +
", version: .*" +
", action: cluster:monitor/nodes/hot_threads\\[n\\]\\]" +
" WRITE: \\d+B";
final MockLogAppender.LoggingExpectation writeExpectation =
new MockLogAppender.PatternSeenEventExcpectation(
"hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, writePattern);
final String readPattern =
".*\\[length: \\d+" +
", request id: \\d+" +
", type: request" +
", version: .*" +
", action: cluster:monitor/nodes/hot_threads\\[n\\]\\]" +
" READ: \\d+B";
final MockLogAppender.LoggingExpectation readExpectation =
new MockLogAppender.PatternSeenEventExcpectation(
"hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, readPattern);
appender.addExpectation(writeExpectation);
appender.addExpectation(readExpectation);
client().admin().cluster().nodesHotThreads(new NodesHotThreadsRequest()).actionGet();
appender.assertAllExpectationsMatched();
}
}

View File

@ -207,6 +207,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private final MeanMetric transmittedBytesMetric = new MeanMetric();
private volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap();
private final ResponseHandlers responseHandlers = new ResponseHandlers();
private final TransportLogger transportLogger;
private final BytesReference pingMessage;
public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays,
@ -221,6 +222,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
this.networkService = networkService;
this.transportName = transportName;
this.transportLogger = new TransportLogger(settings);
final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings);
if (defaultFeatures == null) {
this.features = new String[0];
@ -788,7 +791,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// in case we are able to return data, serialize the exception content and sent it back to the client
if (channel.isOpen()) {
BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8));
final SendMetricListener closeChannel = new SendMetricListener(message.length()) {
final SendMetricListener listener = new SendMetricListener(message.length()) {
@Override
protected void innerInnerOnResponse(Void v) {
CloseableChannel.closeChannel(channel);
@ -800,7 +803,14 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
CloseableChannel.closeChannel(channel);
}
};
internalSendMessage(channel, message, closeChannel);
// We do not call internalSendMessage because we are not sending a message that is an
// elasticsearch binary message. We are just serializing an exception here. Not formatting it
// as an elasticsearch transport message.
try {
channel.sendMessage(message, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
}
} else {
logger.warn(() -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", channel), e);
@ -906,6 +916,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
* sends a message to the given channel, using the given callbacks.
*/
private void internalSendMessage(TcpChannel channel, BytesReference message, SendMetricListener listener) {
transportLogger.logOutboundMessage(channel, message);
try {
channel.sendMessage(message, listener);
} catch (Exception ex) {
@ -1050,6 +1061,24 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
return new CompositeBytesReference(header, messageBody, zeroCopyBuffer);
}
/**
* Handles inbound message that has been decoded.
*
* @param channel the channel the message if fomr
* @param message the message
*/
public void inboundMessage(TcpChannel channel, BytesReference message) {
try {
transportLogger.logInboundMessage(channel, message);
// Message length of 0 is a ping
if (message.length() != 0) {
messageReceived(message, channel);
}
} catch (Exception e) {
onException(channel, e);
}
}
/**
* Consumes bytes that are available from network reads. This method returns the number of bytes consumed
* in this call.
@ -1067,15 +1096,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
if (message == null) {
return 0;
} else if (message.length() == 0) {
// This is a ping and should not be handled.
return BYTES_NEEDED_FOR_MESSAGE_SIZE;
} else {
try {
messageReceived(message, channel);
} catch (Exception e) {
onException(channel, e);
}
inboundMessage(channel, message);
return message.length() + BYTES_NEEDED_FOR_MESSAGE_SIZE;
}
}
@ -1091,7 +1113,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
* @throws IllegalArgumentException if the message length is greater that the maximum allowed frame size.
* This is dependent on the available memory.
*/
public static BytesReference decodeFrame(BytesReference networkBytes) throws IOException {
static BytesReference decodeFrame(BytesReference networkBytes) throws IOException {
int messageLength = readMessageLength(networkBytes);
if (messageLength == -1) {
return null;

View File

@ -0,0 +1,122 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotCompressedException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import java.io.IOException;
public final class TransportLogger {
private final Logger logger;
private static final int HEADER_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
TransportLogger(Settings settings) {
logger = Loggers.getLogger(TransportLogger.class, settings);
}
void logInboundMessage(TcpChannel channel, BytesReference message) {
if (logger.isTraceEnabled()) {
try {
String logMessage = format(channel, message, "READ");
logger.trace(logMessage);
} catch (IOException e) {
logger.trace("an exception occurred formatting a READ trace message", e);
}
}
}
void logOutboundMessage(TcpChannel channel, BytesReference message) {
if (logger.isTraceEnabled()) {
try {
BytesReference withoutHeader = message.slice(HEADER_SIZE, message.length() - HEADER_SIZE);
String logMessage = format(channel, withoutHeader, "WRITE");
logger.trace(logMessage);
} catch (IOException e) {
logger.trace("an exception occurred formatting a WRITE trace message", e);
}
}
}
private String format(TcpChannel channel, BytesReference message, String event) throws IOException {
final StringBuilder sb = new StringBuilder();
sb.append(channel);
int messageLengthWithHeader = HEADER_SIZE + message.length();
// This is a ping
if (message.length() == 0) {
sb.append(" [ping]").append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B');
} else {
boolean success = false;
StreamInput streamInput = message.streamInput();
try {
final long requestId = streamInput.readLong();
final byte status = streamInput.readByte();
final boolean isRequest = TransportStatus.isRequest(status);
final String type = isRequest ? "request" : "response";
final String version = Version.fromId(streamInput.readInt()).toString();
sb.append(" [length: ").append(messageLengthWithHeader);
sb.append(", request id: ").append(requestId);
sb.append(", type: ").append(type);
sb.append(", version: ").append(version);
if (isRequest) {
if (TransportStatus.isCompress(status)) {
Compressor compressor;
try {
final int bytesConsumed = TcpHeader.REQUEST_ID_SIZE + TcpHeader.STATUS_SIZE + TcpHeader.VERSION_ID_SIZE;
compressor = CompressorFactory.compressor(message.slice(bytesConsumed, message.length() - bytesConsumed));
} catch (NotCompressedException ex) {
throw new IllegalStateException(ex);
}
streamInput = compressor.streamInput(streamInput);
}
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
context.readHeaders(streamInput);
}
// now we decode the features
if (streamInput.getVersion().onOrAfter(Version.V_6_3_0)) {
streamInput.readStringArray();
}
sb.append(", action: ").append(streamInput.readString());
}
sb.append(']');
sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B');
success = true;
} finally {
if (success) {
IOUtils.close(streamInput);
} else {
IOUtils.closeWhileHandlingException(streamInput);
}
}
}
return sb.toString();
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.apache.logging.log4j.Level;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import java.io.IOException;
import static org.mockito.Mockito.mock;
@TestLogging(value = "org.elasticsearch.transport.TransportLogger:trace")
public class TransportLoggerTests extends ESTestCase {
private MockLogAppender appender;
public void setUp() throws Exception {
super.setUp();
appender = new MockLogAppender();
Loggers.addAppender(Loggers.getLogger(TransportLogger.class), appender);
appender.start();
}
public void tearDown() throws Exception {
Loggers.removeAppender(Loggers.getLogger(TransportLogger.class), appender);
appender.stop();
super.tearDown();
}
public void testLoggingHandler() throws IOException {
TransportLogger transportLogger = new TransportLogger(Settings.EMPTY);
final String writePattern =
".*\\[length: \\d+" +
", request id: \\d+" +
", type: request" +
", version: .*" +
", action: cluster:monitor/stats]" +
" WRITE: \\d+B";
final MockLogAppender.LoggingExpectation writeExpectation =
new MockLogAppender.PatternSeenEventExcpectation(
"hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, writePattern);
final String readPattern =
".*\\[length: \\d+" +
", request id: \\d+" +
", type: request" +
", version: .*" +
", action: cluster:monitor/stats]" +
" READ: \\d+B";
final MockLogAppender.LoggingExpectation readExpectation =
new MockLogAppender.PatternSeenEventExcpectation(
"cluster monitor request", TransportLogger.class.getCanonicalName(), Level.TRACE, readPattern);
appender.addExpectation(writeExpectation);
appender.addExpectation(readExpectation);
BytesReference bytesReference = buildRequest();
transportLogger.logInboundMessage(mock(TcpChannel.class), bytesReference.slice(6, bytesReference.length() - 6));
transportLogger.logOutboundMessage(mock(TcpChannel.class), bytesReference);
appender.assertAllExpectationsMatched();
}
private BytesReference buildRequest() throws IOException {
try (BytesStreamOutput messageOutput = new BytesStreamOutput()) {
messageOutput.setVersion(Version.CURRENT);
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
context.writeTo(messageOutput);
}
messageOutput.writeStringArray(new String[0]);
messageOutput.writeString(ClusterStatsAction.NAME);
new ClusterStatsRequest().writeTo(messageOutput);
BytesReference messageBody = messageOutput.bytes();
final BytesReference header = buildHeader(randomInt(30), messageBody.length());
return new CompositeBytesReference(header, messageBody);
}
}
private BytesReference buildHeader(long requestId, int length) throws IOException {
try (BytesStreamOutput headerOutput = new BytesStreamOutput(TcpHeader.HEADER_SIZE)) {
headerOutput.setVersion(Version.CURRENT);
TcpHeader.writeHeader(headerOutput, requestId, TransportStatus.setRequest((byte) 0), Version.CURRENT, length);
final BytesReference bytes = headerOutput.bytes();
assert bytes.length() == TcpHeader.HEADER_SIZE : "header size mismatch expected: " + TcpHeader.HEADER_SIZE + " but was: "
+ bytes.length();
return bytes;
}
}
}