From b697f485bb4815b231f4accb5725fdc237214aef Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 5 Sep 2018 16:12:37 -0600 Subject: [PATCH] Introduce `TransportLogger` for common logging (#32725) Historically we have had a ESLoggingHandler in the netty module that logs low-level connection operations. This class just extends the netty logging handler with some (broken) message deserialization. This commit fixes this message serialization and moves the class to server. This new logger logs inbound and outbound messages. Eventually, we should move other event logging to this class (connect, close, flush). That way we will have consistent logging regards of which transport is loaded. Resolves #27306 on master. Older branches will need a different fix. --- .../transport/netty4/ESLoggingHandler.java | 102 +-------------- .../netty4/Netty4MessageChannelHandler.java | 18 +-- .../netty4/Netty4SizeHeaderFrameDecoder.java | 24 ++-- .../transport/netty4/Netty4Utils.java | 1 - .../transport/netty4/ESLoggingHandlerIT.java | 9 +- .../transport/nio/NioTransportLoggingIT.java | 79 ++++++++++++ .../elasticsearch/transport/TcpTransport.java | 44 +++++-- .../transport/TransportLogger.java | 122 ++++++++++++++++++ .../transport/TransportLoggerTests.java | 116 +++++++++++++++++ 9 files changed, 376 insertions(+), 139 deletions(-) create mode 100644 plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportLoggingIT.java create mode 100644 server/src/main/java/org/elasticsearch/transport/TransportLogger.java create mode 100644 server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java index 5c275f63be8..3f4eb0695fa 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java @@ -19,21 +19,9 @@ package org.elasticsearch.transport.netty4; -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import org.elasticsearch.Version; -import org.elasticsearch.common.compress.Compressor; -import org.elasticsearch.common.compress.CompressorFactory; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.transport.TcpHeader; -import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.TransportStatus; - -import java.io.IOException; final class ESLoggingHandler extends LoggingHandler { @@ -42,92 +30,8 @@ final class ESLoggingHandler extends LoggingHandler { } @Override - protected String format(final ChannelHandlerContext ctx, final String eventName, final Object arg) { - if (arg instanceof ByteBuf) { - try { - return format(ctx, eventName, (ByteBuf) arg); - } catch (final Exception e) { - // we really do not want to allow a bug in the formatting handling to escape - logger.trace("an exception occurred formatting a trace message", e); - // we are going to let this be formatted via the default formatting - return super.format(ctx, eventName, arg); - } - } else { - return super.format(ctx, eventName, arg); - } + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + // We do not want to log read complete events because we log inbound messages in the TcpTransport. + ctx.fireChannelReadComplete(); } - - private static final int MESSAGE_LENGTH_OFFSET = TcpHeader.MARKER_BYTES_SIZE; - private static final int REQUEST_ID_OFFSET = MESSAGE_LENGTH_OFFSET + TcpHeader.MESSAGE_LENGTH_SIZE; - private static final int STATUS_OFFSET = REQUEST_ID_OFFSET + TcpHeader.REQUEST_ID_SIZE; - private static final int VERSION_ID_OFFSET = STATUS_OFFSET + TcpHeader.STATUS_SIZE; - private static final int ACTION_OFFSET = VERSION_ID_OFFSET + TcpHeader.VERSION_ID_SIZE; - - private String format(final ChannelHandlerContext ctx, final String eventName, final ByteBuf arg) throws IOException { - final int readableBytes = arg.readableBytes(); - if (readableBytes == 0) { - return super.format(ctx, eventName, arg); - } else if (readableBytes >= 2) { - final StringBuilder sb = new StringBuilder(); - sb.append(ctx.channel().toString()); - final int offset = arg.readerIndex(); - // this might be an ES message, check the header - if (arg.getByte(offset) == (byte) 'E' && arg.getByte(offset + 1) == (byte) 'S') { - if (readableBytes == TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE) { - final int length = arg.getInt(offset + MESSAGE_LENGTH_OFFSET); - if (length == TcpTransport.PING_DATA_SIZE) { - sb.append(" [ping]").append(' ').append(eventName).append(": ").append(readableBytes).append('B'); - return sb.toString(); - } - } - else if (readableBytes >= TcpHeader.HEADER_SIZE) { - // we are going to try to decode this as an ES message - final int length = arg.getInt(offset + MESSAGE_LENGTH_OFFSET); - final long requestId = arg.getLong(offset + REQUEST_ID_OFFSET); - final byte status = arg.getByte(offset + STATUS_OFFSET); - final boolean isRequest = TransportStatus.isRequest(status); - final String type = isRequest ? "request" : "response"; - final String version = Version.fromId(arg.getInt(offset + VERSION_ID_OFFSET)).toString(); - sb.append(" [length: ").append(length); - sb.append(", request id: ").append(requestId); - sb.append(", type: ").append(type); - sb.append(", version: ").append(version); - if (isRequest) { - // it looks like an ES request, try to decode the action - final int remaining = readableBytes - ACTION_OFFSET; - final ByteBuf slice = arg.slice(offset + ACTION_OFFSET, remaining); - // the stream might be compressed - try (StreamInput in = in(status, slice, remaining)) { - // the first bytes in the message is the context headers - try (ThreadContext context = new ThreadContext(Settings.EMPTY)) { - context.readHeaders(in); - } - // now we decode the features - if (in.getVersion().onOrAfter(Version.V_6_3_0)) { - in.readStringArray(); - } - // now we can decode the action name - sb.append(", action: ").append(in.readString()); - } - } - sb.append(']'); - sb.append(' ').append(eventName).append(": ").append(readableBytes).append('B'); - return sb.toString(); - } - } - } - // we could not decode this as an ES message, use the default formatting - return super.format(ctx, eventName, arg); - } - - private StreamInput in(final Byte status, final ByteBuf slice, final int remaining) throws IOException { - final ByteBufStreamInput in = new ByteBufStreamInput(slice, remaining); - if (TransportStatus.isCompress(status)) { - final Compressor compressor = CompressorFactory.compressor(Netty4Utils.toBytesReference(slice)); - return compressor.streamInput(in); - } else { - return in; - } - } - } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java index 620b5cb13c6..29ae47df06f 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java @@ -26,8 +26,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.util.Attribute; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.transport.TcpHeader; import org.elasticsearch.transport.Transports; @@ -46,23 +44,15 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Transports.assertTransportThread(); - if (!(msg instanceof ByteBuf)) { - ctx.fireChannelRead(msg); - return; - } + assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass(); + final ByteBuf buffer = (ByteBuf) msg; - final int remainingMessageSize = buffer.getInt(buffer.readerIndex() - TcpHeader.MESSAGE_LENGTH_SIZE); - final int expectedReaderIndex = buffer.readerIndex() + remainingMessageSize; try { Channel channel = ctx.channel(); - // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh - // buffer, or in the cumulative buffer, which is cleaned each time so it could be bigger than the actual size - BytesReference reference = Netty4Utils.toBytesReference(buffer, remainingMessageSize); Attribute channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY); - transport.messageReceived(reference, channelAttribute.get()); + transport.inboundMessage(channelAttribute.get(), Netty4Utils.toBytesReference(buffer)); } finally { - // Set the expected position of the buffer, no matter what happened - buffer.readerIndex(expectedReaderIndex); + buffer.release(); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoder.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoder.java index 40eabfc1263..1951d789b65 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoder.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoder.java @@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.TooLongFrameException; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.transport.TcpHeader; import org.elasticsearch.transport.TcpTransport; @@ -36,17 +35,20 @@ final class Netty4SizeHeaderFrameDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { try { - BytesReference networkBytes = Netty4Utils.toBytesReference(in); - int messageLength = TcpTransport.readMessageLength(networkBytes); - // If the message length is -1, we have not read a complete header. - if (messageLength != -1) { - int messageLengthWithHeader = messageLength + HEADER_SIZE; - // If the message length is greater than the network bytes available, we have not read a complete frame. - if (messageLengthWithHeader <= networkBytes.length()) { - final ByteBuf message = in.skipBytes(HEADER_SIZE); - // 6 bytes would mean it is a ping. And we should ignore. - if (messageLengthWithHeader != 6) { + boolean continueDecode = true; + while (continueDecode) { + int messageLength = TcpTransport.readMessageLength(Netty4Utils.toBytesReference(in)); + if (messageLength == -1) { + continueDecode = false; + } else { + int messageLengthWithHeader = messageLength + HEADER_SIZE; + // If the message length is greater than the network bytes available, we have not read a complete frame. + if (messageLengthWithHeader > in.readableBytes()) { + continueDecode = false; + } else { + final ByteBuf message = in.retainedSlice(in.readerIndex() + HEADER_SIZE, messageLength); out.add(message); + in.readerIndex(in.readerIndex() + messageLengthWithHeader); } } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java index 655dafdd289..76d7864c716 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java @@ -156,5 +156,4 @@ public class Netty4Utils { throw closingExceptions; } } - } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java index acd71749e23..abe02cdf4c1 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java @@ -26,9 +26,10 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.transport.TransportLogger; @ESIntegTestCase.ClusterScope(numDataNodes = 2) -@TestLogging(value = "org.elasticsearch.transport.netty4.ESLoggingHandler:trace") +@TestLogging(value = "org.elasticsearch.transport.netty4.ESLoggingHandler:trace,org.elasticsearch.transport.TransportLogger:trace") public class ESLoggingHandlerIT extends ESNetty4IntegTestCase { private MockLogAppender appender; @@ -37,11 +38,13 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase { super.setUp(); appender = new MockLogAppender(); Loggers.addAppender(Loggers.getLogger(ESLoggingHandler.class), appender); + Loggers.addAppender(Loggers.getLogger(TransportLogger.class), appender); appender.start(); } public void tearDown() throws Exception { Loggers.removeAppender(Loggers.getLogger(ESLoggingHandler.class), appender); + Loggers.removeAppender(Loggers.getLogger(TransportLogger.class), appender); appender.stop(); super.tearDown(); } @@ -56,7 +59,7 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase { " WRITE: \\d+B"; final MockLogAppender.LoggingExpectation writeExpectation = new MockLogAppender.PatternSeenEventExcpectation( - "hot threads request", ESLoggingHandler.class.getCanonicalName(), Level.TRACE, writePattern); + "hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, writePattern); final MockLogAppender.LoggingExpectation flushExpectation = new MockLogAppender.SeenEventExpectation("flush", ESLoggingHandler.class.getCanonicalName(), Level.TRACE, "*FLUSH*"); @@ -71,7 +74,7 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase { final MockLogAppender.LoggingExpectation readExpectation = new MockLogAppender.PatternSeenEventExcpectation( - "hot threads request", ESLoggingHandler.class.getCanonicalName(), Level.TRACE, readPattern); + "hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, readPattern); appender.addExpectation(writeExpectation); appender.addExpectation(flushExpectation); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportLoggingIT.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportLoggingIT.java new file mode 100644 index 00000000000..b29df77cae1 --- /dev/null +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportLoggingIT.java @@ -0,0 +1,79 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.nio; + +import org.apache.logging.log4j.Level; +import org.elasticsearch.NioIntegTestCase; +import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.transport.TransportLogger; + +@ESIntegTestCase.ClusterScope(numDataNodes = 2) +@TestLogging(value = "org.elasticsearch.transport.TransportLogger:trace") +public class NioTransportLoggingIT extends NioIntegTestCase { + + private MockLogAppender appender; + + public void setUp() throws Exception { + super.setUp(); + appender = new MockLogAppender(); + Loggers.addAppender(Loggers.getLogger(TransportLogger.class), appender); + appender.start(); + } + + public void tearDown() throws Exception { + Loggers.removeAppender(Loggers.getLogger(TransportLogger.class), appender); + appender.stop(); + super.tearDown(); + } + + public void testLoggingHandler() throws IllegalAccessException { + final String writePattern = + ".*\\[length: \\d+" + + ", request id: \\d+" + + ", type: request" + + ", version: .*" + + ", action: cluster:monitor/nodes/hot_threads\\[n\\]\\]" + + " WRITE: \\d+B"; + final MockLogAppender.LoggingExpectation writeExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, writePattern); + + final String readPattern = + ".*\\[length: \\d+" + + ", request id: \\d+" + + ", type: request" + + ", version: .*" + + ", action: cluster:monitor/nodes/hot_threads\\[n\\]\\]" + + " READ: \\d+B"; + + final MockLogAppender.LoggingExpectation readExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, readPattern); + + appender.addExpectation(writeExpectation); + appender.addExpectation(readExpectation); + client().admin().cluster().nodesHotThreads(new NodesHotThreadsRequest()).actionGet(); + appender.assertAllExpectationsMatched(); + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index d71e459fccd..2552007463b 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -207,6 +207,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final MeanMetric transmittedBytesMetric = new MeanMetric(); private volatile Map requestHandlers = Collections.emptyMap(); private final ResponseHandlers responseHandlers = new ResponseHandlers(); + private final TransportLogger transportLogger; private final BytesReference pingMessage; public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays, @@ -221,6 +222,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings); this.networkService = networkService; this.transportName = transportName; + this.transportLogger = new TransportLogger(settings); + final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings); if (defaultFeatures == null) { this.features = new String[0]; @@ -788,7 +791,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements // in case we are able to return data, serialize the exception content and sent it back to the client if (channel.isOpen()) { BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)); - final SendMetricListener closeChannel = new SendMetricListener(message.length()) { + final SendMetricListener listener = new SendMetricListener(message.length()) { @Override protected void innerInnerOnResponse(Void v) { CloseableChannel.closeChannel(channel); @@ -800,7 +803,14 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements CloseableChannel.closeChannel(channel); } }; - internalSendMessage(channel, message, closeChannel); + // We do not call internalSendMessage because we are not sending a message that is an + // elasticsearch binary message. We are just serializing an exception here. Not formatting it + // as an elasticsearch transport message. + try { + channel.sendMessage(message, listener); + } catch (Exception ex) { + listener.onFailure(ex); + } } } else { logger.warn(() -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", channel), e); @@ -906,6 +916,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements * sends a message to the given channel, using the given callbacks. */ private void internalSendMessage(TcpChannel channel, BytesReference message, SendMetricListener listener) { + transportLogger.logOutboundMessage(channel, message); try { channel.sendMessage(message, listener); } catch (Exception ex) { @@ -1050,6 +1061,24 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements return new CompositeBytesReference(header, messageBody, zeroCopyBuffer); } + /** + * Handles inbound message that has been decoded. + * + * @param channel the channel the message if fomr + * @param message the message + */ + public void inboundMessage(TcpChannel channel, BytesReference message) { + try { + transportLogger.logInboundMessage(channel, message); + // Message length of 0 is a ping + if (message.length() != 0) { + messageReceived(message, channel); + } + } catch (Exception e) { + onException(channel, e); + } + } + /** * Consumes bytes that are available from network reads. This method returns the number of bytes consumed * in this call. @@ -1067,15 +1096,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements if (message == null) { return 0; - } else if (message.length() == 0) { - // This is a ping and should not be handled. - return BYTES_NEEDED_FOR_MESSAGE_SIZE; } else { - try { - messageReceived(message, channel); - } catch (Exception e) { - onException(channel, e); - } + inboundMessage(channel, message); return message.length() + BYTES_NEEDED_FOR_MESSAGE_SIZE; } } @@ -1091,7 +1113,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements * @throws IllegalArgumentException if the message length is greater that the maximum allowed frame size. * This is dependent on the available memory. */ - public static BytesReference decodeFrame(BytesReference networkBytes) throws IOException { + static BytesReference decodeFrame(BytesReference networkBytes) throws IOException { int messageLength = readMessageLength(networkBytes); if (messageLength == -1) { return null; diff --git a/server/src/main/java/org/elasticsearch/transport/TransportLogger.java b/server/src/main/java/org/elasticsearch/transport/TransportLogger.java new file mode 100644 index 00000000000..3120620b053 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/TransportLogger.java @@ -0,0 +1,122 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.Compressor; +import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.compress.NotCompressedException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.internal.io.IOUtils; + +import java.io.IOException; + +public final class TransportLogger { + + private final Logger logger; + private static final int HEADER_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE; + + TransportLogger(Settings settings) { + logger = Loggers.getLogger(TransportLogger.class, settings); + } + + void logInboundMessage(TcpChannel channel, BytesReference message) { + if (logger.isTraceEnabled()) { + try { + String logMessage = format(channel, message, "READ"); + logger.trace(logMessage); + } catch (IOException e) { + logger.trace("an exception occurred formatting a READ trace message", e); + } + } + } + + void logOutboundMessage(TcpChannel channel, BytesReference message) { + if (logger.isTraceEnabled()) { + try { + BytesReference withoutHeader = message.slice(HEADER_SIZE, message.length() - HEADER_SIZE); + String logMessage = format(channel, withoutHeader, "WRITE"); + logger.trace(logMessage); + } catch (IOException e) { + logger.trace("an exception occurred formatting a WRITE trace message", e); + } + } + } + + private String format(TcpChannel channel, BytesReference message, String event) throws IOException { + final StringBuilder sb = new StringBuilder(); + sb.append(channel); + int messageLengthWithHeader = HEADER_SIZE + message.length(); + // This is a ping + if (message.length() == 0) { + sb.append(" [ping]").append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B'); + } else { + boolean success = false; + StreamInput streamInput = message.streamInput(); + try { + final long requestId = streamInput.readLong(); + final byte status = streamInput.readByte(); + final boolean isRequest = TransportStatus.isRequest(status); + final String type = isRequest ? "request" : "response"; + final String version = Version.fromId(streamInput.readInt()).toString(); + sb.append(" [length: ").append(messageLengthWithHeader); + sb.append(", request id: ").append(requestId); + sb.append(", type: ").append(type); + sb.append(", version: ").append(version); + + if (isRequest) { + if (TransportStatus.isCompress(status)) { + Compressor compressor; + try { + final int bytesConsumed = TcpHeader.REQUEST_ID_SIZE + TcpHeader.STATUS_SIZE + TcpHeader.VERSION_ID_SIZE; + compressor = CompressorFactory.compressor(message.slice(bytesConsumed, message.length() - bytesConsumed)); + } catch (NotCompressedException ex) { + throw new IllegalStateException(ex); + } + streamInput = compressor.streamInput(streamInput); + } + + try (ThreadContext context = new ThreadContext(Settings.EMPTY)) { + context.readHeaders(streamInput); + } + // now we decode the features + if (streamInput.getVersion().onOrAfter(Version.V_6_3_0)) { + streamInput.readStringArray(); + } + sb.append(", action: ").append(streamInput.readString()); + } + sb.append(']'); + sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B'); + success = true; + } finally { + if (success) { + IOUtils.close(streamInput); + } else { + IOUtils.closeWhileHandlingException(streamInput); + } + } + } + return sb.toString(); + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java new file mode 100644 index 00000000000..42a61008820 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java @@ -0,0 +1,116 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.apache.logging.log4j.Level; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction; +import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; + +@TestLogging(value = "org.elasticsearch.transport.TransportLogger:trace") +public class TransportLoggerTests extends ESTestCase { + + private MockLogAppender appender; + + public void setUp() throws Exception { + super.setUp(); + appender = new MockLogAppender(); + Loggers.addAppender(Loggers.getLogger(TransportLogger.class), appender); + appender.start(); + } + + public void tearDown() throws Exception { + Loggers.removeAppender(Loggers.getLogger(TransportLogger.class), appender); + appender.stop(); + super.tearDown(); + } + + public void testLoggingHandler() throws IOException { + TransportLogger transportLogger = new TransportLogger(Settings.EMPTY); + + final String writePattern = + ".*\\[length: \\d+" + + ", request id: \\d+" + + ", type: request" + + ", version: .*" + + ", action: cluster:monitor/stats]" + + " WRITE: \\d+B"; + final MockLogAppender.LoggingExpectation writeExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, writePattern); + + final String readPattern = + ".*\\[length: \\d+" + + ", request id: \\d+" + + ", type: request" + + ", version: .*" + + ", action: cluster:monitor/stats]" + + " READ: \\d+B"; + + final MockLogAppender.LoggingExpectation readExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "cluster monitor request", TransportLogger.class.getCanonicalName(), Level.TRACE, readPattern); + + appender.addExpectation(writeExpectation); + appender.addExpectation(readExpectation); + BytesReference bytesReference = buildRequest(); + transportLogger.logInboundMessage(mock(TcpChannel.class), bytesReference.slice(6, bytesReference.length() - 6)); + transportLogger.logOutboundMessage(mock(TcpChannel.class), bytesReference); + appender.assertAllExpectationsMatched(); + } + + private BytesReference buildRequest() throws IOException { + try (BytesStreamOutput messageOutput = new BytesStreamOutput()) { + messageOutput.setVersion(Version.CURRENT); + try (ThreadContext context = new ThreadContext(Settings.EMPTY)) { + context.writeTo(messageOutput); + } + messageOutput.writeStringArray(new String[0]); + messageOutput.writeString(ClusterStatsAction.NAME); + new ClusterStatsRequest().writeTo(messageOutput); + BytesReference messageBody = messageOutput.bytes(); + final BytesReference header = buildHeader(randomInt(30), messageBody.length()); + return new CompositeBytesReference(header, messageBody); + } + } + + private BytesReference buildHeader(long requestId, int length) throws IOException { + try (BytesStreamOutput headerOutput = new BytesStreamOutput(TcpHeader.HEADER_SIZE)) { + headerOutput.setVersion(Version.CURRENT); + TcpHeader.writeHeader(headerOutput, requestId, TransportStatus.setRequest((byte) 0), Version.CURRENT, length); + final BytesReference bytes = headerOutput.bytes(); + assert bytes.length() == TcpHeader.HEADER_SIZE : "header size mismatch expected: " + TcpHeader.HEADER_SIZE + " but was: " + + bytes.length(); + return bytes; + } + } +}