diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java index 5c275f63be8..3f4eb0695fa 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java @@ -19,21 +19,9 @@ package org.elasticsearch.transport.netty4; -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import org.elasticsearch.Version; -import org.elasticsearch.common.compress.Compressor; -import org.elasticsearch.common.compress.CompressorFactory; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.transport.TcpHeader; -import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.TransportStatus; - -import java.io.IOException; final class ESLoggingHandler extends LoggingHandler { @@ -42,92 +30,8 @@ final class ESLoggingHandler extends LoggingHandler { } @Override - protected String format(final ChannelHandlerContext ctx, final String eventName, final Object arg) { - if (arg instanceof ByteBuf) { - try { - return format(ctx, eventName, (ByteBuf) arg); - } catch (final Exception e) { - // we really do not want to allow a bug in the formatting handling to escape - logger.trace("an exception occurred formatting a trace message", e); - // we are going to let this be formatted via the default formatting - return super.format(ctx, eventName, arg); - } - } else { - return super.format(ctx, eventName, arg); - } + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + // We do not want to log read complete events because we log inbound messages in the TcpTransport. + ctx.fireChannelReadComplete(); } - - private static final int MESSAGE_LENGTH_OFFSET = TcpHeader.MARKER_BYTES_SIZE; - private static final int REQUEST_ID_OFFSET = MESSAGE_LENGTH_OFFSET + TcpHeader.MESSAGE_LENGTH_SIZE; - private static final int STATUS_OFFSET = REQUEST_ID_OFFSET + TcpHeader.REQUEST_ID_SIZE; - private static final int VERSION_ID_OFFSET = STATUS_OFFSET + TcpHeader.STATUS_SIZE; - private static final int ACTION_OFFSET = VERSION_ID_OFFSET + TcpHeader.VERSION_ID_SIZE; - - private String format(final ChannelHandlerContext ctx, final String eventName, final ByteBuf arg) throws IOException { - final int readableBytes = arg.readableBytes(); - if (readableBytes == 0) { - return super.format(ctx, eventName, arg); - } else if (readableBytes >= 2) { - final StringBuilder sb = new StringBuilder(); - sb.append(ctx.channel().toString()); - final int offset = arg.readerIndex(); - // this might be an ES message, check the header - if (arg.getByte(offset) == (byte) 'E' && arg.getByte(offset + 1) == (byte) 'S') { - if (readableBytes == TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE) { - final int length = arg.getInt(offset + MESSAGE_LENGTH_OFFSET); - if (length == TcpTransport.PING_DATA_SIZE) { - sb.append(" [ping]").append(' ').append(eventName).append(": ").append(readableBytes).append('B'); - return sb.toString(); - } - } - else if (readableBytes >= TcpHeader.HEADER_SIZE) { - // we are going to try to decode this as an ES message - final int length = arg.getInt(offset + MESSAGE_LENGTH_OFFSET); - final long requestId = arg.getLong(offset + REQUEST_ID_OFFSET); - final byte status = arg.getByte(offset + STATUS_OFFSET); - final boolean isRequest = TransportStatus.isRequest(status); - final String type = isRequest ? "request" : "response"; - final String version = Version.fromId(arg.getInt(offset + VERSION_ID_OFFSET)).toString(); - sb.append(" [length: ").append(length); - sb.append(", request id: ").append(requestId); - sb.append(", type: ").append(type); - sb.append(", version: ").append(version); - if (isRequest) { - // it looks like an ES request, try to decode the action - final int remaining = readableBytes - ACTION_OFFSET; - final ByteBuf slice = arg.slice(offset + ACTION_OFFSET, remaining); - // the stream might be compressed - try (StreamInput in = in(status, slice, remaining)) { - // the first bytes in the message is the context headers - try (ThreadContext context = new ThreadContext(Settings.EMPTY)) { - context.readHeaders(in); - } - // now we decode the features - if (in.getVersion().onOrAfter(Version.V_6_3_0)) { - in.readStringArray(); - } - // now we can decode the action name - sb.append(", action: ").append(in.readString()); - } - } - sb.append(']'); - sb.append(' ').append(eventName).append(": ").append(readableBytes).append('B'); - return sb.toString(); - } - } - } - // we could not decode this as an ES message, use the default formatting - return super.format(ctx, eventName, arg); - } - - private StreamInput in(final Byte status, final ByteBuf slice, final int remaining) throws IOException { - final ByteBufStreamInput in = new ByteBufStreamInput(slice, remaining); - if (TransportStatus.isCompress(status)) { - final Compressor compressor = CompressorFactory.compressor(Netty4Utils.toBytesReference(slice)); - return compressor.streamInput(in); - } else { - return in; - } - } - } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java index 620b5cb13c6..29ae47df06f 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java @@ -26,8 +26,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.util.Attribute; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.transport.TcpHeader; import org.elasticsearch.transport.Transports; @@ -46,23 +44,15 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Transports.assertTransportThread(); - if (!(msg instanceof ByteBuf)) { - ctx.fireChannelRead(msg); - return; - } + assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass(); + final ByteBuf buffer = (ByteBuf) msg; - final int remainingMessageSize = buffer.getInt(buffer.readerIndex() - TcpHeader.MESSAGE_LENGTH_SIZE); - final int expectedReaderIndex = buffer.readerIndex() + remainingMessageSize; try { Channel channel = ctx.channel(); - // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh - // buffer, or in the cumulative buffer, which is cleaned each time so it could be bigger than the actual size - BytesReference reference = Netty4Utils.toBytesReference(buffer, remainingMessageSize); Attribute channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY); - transport.messageReceived(reference, channelAttribute.get()); + transport.inboundMessage(channelAttribute.get(), Netty4Utils.toBytesReference(buffer)); } finally { - // Set the expected position of the buffer, no matter what happened - buffer.readerIndex(expectedReaderIndex); + buffer.release(); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoder.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoder.java index 40eabfc1263..1951d789b65 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoder.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoder.java @@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.TooLongFrameException; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.transport.TcpHeader; import org.elasticsearch.transport.TcpTransport; @@ -36,17 +35,20 @@ final class Netty4SizeHeaderFrameDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { try { - BytesReference networkBytes = Netty4Utils.toBytesReference(in); - int messageLength = TcpTransport.readMessageLength(networkBytes); - // If the message length is -1, we have not read a complete header. - if (messageLength != -1) { - int messageLengthWithHeader = messageLength + HEADER_SIZE; - // If the message length is greater than the network bytes available, we have not read a complete frame. - if (messageLengthWithHeader <= networkBytes.length()) { - final ByteBuf message = in.skipBytes(HEADER_SIZE); - // 6 bytes would mean it is a ping. And we should ignore. - if (messageLengthWithHeader != 6) { + boolean continueDecode = true; + while (continueDecode) { + int messageLength = TcpTransport.readMessageLength(Netty4Utils.toBytesReference(in)); + if (messageLength == -1) { + continueDecode = false; + } else { + int messageLengthWithHeader = messageLength + HEADER_SIZE; + // If the message length is greater than the network bytes available, we have not read a complete frame. + if (messageLengthWithHeader > in.readableBytes()) { + continueDecode = false; + } else { + final ByteBuf message = in.retainedSlice(in.readerIndex() + HEADER_SIZE, messageLength); out.add(message); + in.readerIndex(in.readerIndex() + messageLengthWithHeader); } } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java index 655dafdd289..76d7864c716 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java @@ -156,5 +156,4 @@ public class Netty4Utils { throw closingExceptions; } } - } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java index acd71749e23..abe02cdf4c1 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java @@ -26,9 +26,10 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.transport.TransportLogger; @ESIntegTestCase.ClusterScope(numDataNodes = 2) -@TestLogging(value = "org.elasticsearch.transport.netty4.ESLoggingHandler:trace") +@TestLogging(value = "org.elasticsearch.transport.netty4.ESLoggingHandler:trace,org.elasticsearch.transport.TransportLogger:trace") public class ESLoggingHandlerIT extends ESNetty4IntegTestCase { private MockLogAppender appender; @@ -37,11 +38,13 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase { super.setUp(); appender = new MockLogAppender(); Loggers.addAppender(Loggers.getLogger(ESLoggingHandler.class), appender); + Loggers.addAppender(Loggers.getLogger(TransportLogger.class), appender); appender.start(); } public void tearDown() throws Exception { Loggers.removeAppender(Loggers.getLogger(ESLoggingHandler.class), appender); + Loggers.removeAppender(Loggers.getLogger(TransportLogger.class), appender); appender.stop(); super.tearDown(); } @@ -56,7 +59,7 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase { " WRITE: \\d+B"; final MockLogAppender.LoggingExpectation writeExpectation = new MockLogAppender.PatternSeenEventExcpectation( - "hot threads request", ESLoggingHandler.class.getCanonicalName(), Level.TRACE, writePattern); + "hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, writePattern); final MockLogAppender.LoggingExpectation flushExpectation = new MockLogAppender.SeenEventExpectation("flush", ESLoggingHandler.class.getCanonicalName(), Level.TRACE, "*FLUSH*"); @@ -71,7 +74,7 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase { final MockLogAppender.LoggingExpectation readExpectation = new MockLogAppender.PatternSeenEventExcpectation( - "hot threads request", ESLoggingHandler.class.getCanonicalName(), Level.TRACE, readPattern); + "hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, readPattern); appender.addExpectation(writeExpectation); appender.addExpectation(flushExpectation); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportLoggingIT.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportLoggingIT.java new file mode 100644 index 00000000000..b29df77cae1 --- /dev/null +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportLoggingIT.java @@ -0,0 +1,79 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.nio; + +import org.apache.logging.log4j.Level; +import org.elasticsearch.NioIntegTestCase; +import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.transport.TransportLogger; + +@ESIntegTestCase.ClusterScope(numDataNodes = 2) +@TestLogging(value = "org.elasticsearch.transport.TransportLogger:trace") +public class NioTransportLoggingIT extends NioIntegTestCase { + + private MockLogAppender appender; + + public void setUp() throws Exception { + super.setUp(); + appender = new MockLogAppender(); + Loggers.addAppender(Loggers.getLogger(TransportLogger.class), appender); + appender.start(); + } + + public void tearDown() throws Exception { + Loggers.removeAppender(Loggers.getLogger(TransportLogger.class), appender); + appender.stop(); + super.tearDown(); + } + + public void testLoggingHandler() throws IllegalAccessException { + final String writePattern = + ".*\\[length: \\d+" + + ", request id: \\d+" + + ", type: request" + + ", version: .*" + + ", action: cluster:monitor/nodes/hot_threads\\[n\\]\\]" + + " WRITE: \\d+B"; + final MockLogAppender.LoggingExpectation writeExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, writePattern); + + final String readPattern = + ".*\\[length: \\d+" + + ", request id: \\d+" + + ", type: request" + + ", version: .*" + + ", action: cluster:monitor/nodes/hot_threads\\[n\\]\\]" + + " READ: \\d+B"; + + final MockLogAppender.LoggingExpectation readExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, readPattern); + + appender.addExpectation(writeExpectation); + appender.addExpectation(readExpectation); + client().admin().cluster().nodesHotThreads(new NodesHotThreadsRequest()).actionGet(); + appender.assertAllExpectationsMatched(); + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index d71e459fccd..2552007463b 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -207,6 +207,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final MeanMetric transmittedBytesMetric = new MeanMetric(); private volatile Map requestHandlers = Collections.emptyMap(); private final ResponseHandlers responseHandlers = new ResponseHandlers(); + private final TransportLogger transportLogger; private final BytesReference pingMessage; public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays, @@ -221,6 +222,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings); this.networkService = networkService; this.transportName = transportName; + this.transportLogger = new TransportLogger(settings); + final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings); if (defaultFeatures == null) { this.features = new String[0]; @@ -788,7 +791,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements // in case we are able to return data, serialize the exception content and sent it back to the client if (channel.isOpen()) { BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)); - final SendMetricListener closeChannel = new SendMetricListener(message.length()) { + final SendMetricListener listener = new SendMetricListener(message.length()) { @Override protected void innerInnerOnResponse(Void v) { CloseableChannel.closeChannel(channel); @@ -800,7 +803,14 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements CloseableChannel.closeChannel(channel); } }; - internalSendMessage(channel, message, closeChannel); + // We do not call internalSendMessage because we are not sending a message that is an + // elasticsearch binary message. We are just serializing an exception here. Not formatting it + // as an elasticsearch transport message. + try { + channel.sendMessage(message, listener); + } catch (Exception ex) { + listener.onFailure(ex); + } } } else { logger.warn(() -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", channel), e); @@ -906,6 +916,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements * sends a message to the given channel, using the given callbacks. */ private void internalSendMessage(TcpChannel channel, BytesReference message, SendMetricListener listener) { + transportLogger.logOutboundMessage(channel, message); try { channel.sendMessage(message, listener); } catch (Exception ex) { @@ -1050,6 +1061,24 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements return new CompositeBytesReference(header, messageBody, zeroCopyBuffer); } + /** + * Handles inbound message that has been decoded. + * + * @param channel the channel the message if fomr + * @param message the message + */ + public void inboundMessage(TcpChannel channel, BytesReference message) { + try { + transportLogger.logInboundMessage(channel, message); + // Message length of 0 is a ping + if (message.length() != 0) { + messageReceived(message, channel); + } + } catch (Exception e) { + onException(channel, e); + } + } + /** * Consumes bytes that are available from network reads. This method returns the number of bytes consumed * in this call. @@ -1067,15 +1096,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements if (message == null) { return 0; - } else if (message.length() == 0) { - // This is a ping and should not be handled. - return BYTES_NEEDED_FOR_MESSAGE_SIZE; } else { - try { - messageReceived(message, channel); - } catch (Exception e) { - onException(channel, e); - } + inboundMessage(channel, message); return message.length() + BYTES_NEEDED_FOR_MESSAGE_SIZE; } } @@ -1091,7 +1113,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements * @throws IllegalArgumentException if the message length is greater that the maximum allowed frame size. * This is dependent on the available memory. */ - public static BytesReference decodeFrame(BytesReference networkBytes) throws IOException { + static BytesReference decodeFrame(BytesReference networkBytes) throws IOException { int messageLength = readMessageLength(networkBytes); if (messageLength == -1) { return null; diff --git a/server/src/main/java/org/elasticsearch/transport/TransportLogger.java b/server/src/main/java/org/elasticsearch/transport/TransportLogger.java new file mode 100644 index 00000000000..3120620b053 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/TransportLogger.java @@ -0,0 +1,122 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.Compressor; +import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.compress.NotCompressedException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.internal.io.IOUtils; + +import java.io.IOException; + +public final class TransportLogger { + + private final Logger logger; + private static final int HEADER_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE; + + TransportLogger(Settings settings) { + logger = Loggers.getLogger(TransportLogger.class, settings); + } + + void logInboundMessage(TcpChannel channel, BytesReference message) { + if (logger.isTraceEnabled()) { + try { + String logMessage = format(channel, message, "READ"); + logger.trace(logMessage); + } catch (IOException e) { + logger.trace("an exception occurred formatting a READ trace message", e); + } + } + } + + void logOutboundMessage(TcpChannel channel, BytesReference message) { + if (logger.isTraceEnabled()) { + try { + BytesReference withoutHeader = message.slice(HEADER_SIZE, message.length() - HEADER_SIZE); + String logMessage = format(channel, withoutHeader, "WRITE"); + logger.trace(logMessage); + } catch (IOException e) { + logger.trace("an exception occurred formatting a WRITE trace message", e); + } + } + } + + private String format(TcpChannel channel, BytesReference message, String event) throws IOException { + final StringBuilder sb = new StringBuilder(); + sb.append(channel); + int messageLengthWithHeader = HEADER_SIZE + message.length(); + // This is a ping + if (message.length() == 0) { + sb.append(" [ping]").append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B'); + } else { + boolean success = false; + StreamInput streamInput = message.streamInput(); + try { + final long requestId = streamInput.readLong(); + final byte status = streamInput.readByte(); + final boolean isRequest = TransportStatus.isRequest(status); + final String type = isRequest ? "request" : "response"; + final String version = Version.fromId(streamInput.readInt()).toString(); + sb.append(" [length: ").append(messageLengthWithHeader); + sb.append(", request id: ").append(requestId); + sb.append(", type: ").append(type); + sb.append(", version: ").append(version); + + if (isRequest) { + if (TransportStatus.isCompress(status)) { + Compressor compressor; + try { + final int bytesConsumed = TcpHeader.REQUEST_ID_SIZE + TcpHeader.STATUS_SIZE + TcpHeader.VERSION_ID_SIZE; + compressor = CompressorFactory.compressor(message.slice(bytesConsumed, message.length() - bytesConsumed)); + } catch (NotCompressedException ex) { + throw new IllegalStateException(ex); + } + streamInput = compressor.streamInput(streamInput); + } + + try (ThreadContext context = new ThreadContext(Settings.EMPTY)) { + context.readHeaders(streamInput); + } + // now we decode the features + if (streamInput.getVersion().onOrAfter(Version.V_6_3_0)) { + streamInput.readStringArray(); + } + sb.append(", action: ").append(streamInput.readString()); + } + sb.append(']'); + sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B'); + success = true; + } finally { + if (success) { + IOUtils.close(streamInput); + } else { + IOUtils.closeWhileHandlingException(streamInput); + } + } + } + return sb.toString(); + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java new file mode 100644 index 00000000000..42a61008820 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java @@ -0,0 +1,116 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.apache.logging.log4j.Level; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction; +import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; + +@TestLogging(value = "org.elasticsearch.transport.TransportLogger:trace") +public class TransportLoggerTests extends ESTestCase { + + private MockLogAppender appender; + + public void setUp() throws Exception { + super.setUp(); + appender = new MockLogAppender(); + Loggers.addAppender(Loggers.getLogger(TransportLogger.class), appender); + appender.start(); + } + + public void tearDown() throws Exception { + Loggers.removeAppender(Loggers.getLogger(TransportLogger.class), appender); + appender.stop(); + super.tearDown(); + } + + public void testLoggingHandler() throws IOException { + TransportLogger transportLogger = new TransportLogger(Settings.EMPTY); + + final String writePattern = + ".*\\[length: \\d+" + + ", request id: \\d+" + + ", type: request" + + ", version: .*" + + ", action: cluster:monitor/stats]" + + " WRITE: \\d+B"; + final MockLogAppender.LoggingExpectation writeExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "hot threads request", TransportLogger.class.getCanonicalName(), Level.TRACE, writePattern); + + final String readPattern = + ".*\\[length: \\d+" + + ", request id: \\d+" + + ", type: request" + + ", version: .*" + + ", action: cluster:monitor/stats]" + + " READ: \\d+B"; + + final MockLogAppender.LoggingExpectation readExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "cluster monitor request", TransportLogger.class.getCanonicalName(), Level.TRACE, readPattern); + + appender.addExpectation(writeExpectation); + appender.addExpectation(readExpectation); + BytesReference bytesReference = buildRequest(); + transportLogger.logInboundMessage(mock(TcpChannel.class), bytesReference.slice(6, bytesReference.length() - 6)); + transportLogger.logOutboundMessage(mock(TcpChannel.class), bytesReference); + appender.assertAllExpectationsMatched(); + } + + private BytesReference buildRequest() throws IOException { + try (BytesStreamOutput messageOutput = new BytesStreamOutput()) { + messageOutput.setVersion(Version.CURRENT); + try (ThreadContext context = new ThreadContext(Settings.EMPTY)) { + context.writeTo(messageOutput); + } + messageOutput.writeStringArray(new String[0]); + messageOutput.writeString(ClusterStatsAction.NAME); + new ClusterStatsRequest().writeTo(messageOutput); + BytesReference messageBody = messageOutput.bytes(); + final BytesReference header = buildHeader(randomInt(30), messageBody.length()); + return new CompositeBytesReference(header, messageBody); + } + } + + private BytesReference buildHeader(long requestId, int length) throws IOException { + try (BytesStreamOutput headerOutput = new BytesStreamOutput(TcpHeader.HEADER_SIZE)) { + headerOutput.setVersion(Version.CURRENT); + TcpHeader.writeHeader(headerOutput, requestId, TransportStatus.setRequest((byte) 0), Version.CURRENT, length); + final BytesReference bytes = headerOutput.bytes(); + assert bytes.length() == TcpHeader.HEADER_SIZE : "header size mismatch expected: " + TcpHeader.HEADER_SIZE + " but was: " + + bytes.length(); + return bytes; + } + } +}