From 0e843d1fac884e27aada18fcecf44906204fea0a Mon Sep 17 00:00:00 2001 From: Pat Fox Date: Mon, 13 Nov 2017 20:03:17 +0100 Subject: [PATCH] ARTEMIS-1520 Adding connection ID into log.tracing add connection ID to the trace logging output dealing with reading/writing packets. This helps correlate the packets read/written to an individual tcp connection --- .../core/protocol/core/impl/ChannelImpl.java | 48 ++++++++++++------- .../core/impl/RemotingConnectionImpl.java | 13 +++-- .../remoting/impl/netty/NettyConnection.java | 2 +- 3 files changed, 38 insertions(+), 25 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 2ef59c06f2..b8049d27d2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -271,7 +271,7 @@ public final class ChannelImpl implements Channel { packet.setChannelID(id); if (logger.isTraceEnabled()) { - logger.trace("Sending packet nonblocking " + packet + " on channelID=" + id); + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id); } ActiveMQBuffer buffer = packet.encode(connection); @@ -280,7 +280,7 @@ public final class ChannelImpl implements Channel { try { if (failingOver) { - waitForFailOver("timed-out waiting for fail-over condition on non-blocking send"); + waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send"); } // Sanity check @@ -296,7 +296,7 @@ public final class ChannelImpl implements Channel { } if (logger.isTraceEnabled()) { - logger.trace("Writing buffer for channelID=" + id); + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id); } checkReconnectID(reconnectID); @@ -331,15 +331,24 @@ public final class ChannelImpl implements Channel { String interceptionResult = invokeInterceptors(packet, interceptors, connection); if (interceptionResult != null) { + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " interceptionResult=" + interceptionResult); + } // if we don't throw an exception here the client might not unblock throw ActiveMQClientMessageBundle.BUNDLE.interceptorRejectedPacket(interceptionResult); } if (closed) { + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " closed."); + } throw ActiveMQClientMessageBundle.BUNDLE.connectionDestroyed(); } if (connection.getBlockingCallTimeout() == -1) { + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Cannot do a blocking call timeout on a server side connection"); + } throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection"); } @@ -354,7 +363,7 @@ public final class ChannelImpl implements Channel { try { if (failingOver) { - waitForFailOver("timed-out waiting for fail-over condition on blocking send"); + waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on blocking send"); } response = null; @@ -366,7 +375,7 @@ public final class ChannelImpl implements Channel { checkReconnectID(reconnectID); if (logger.isTraceEnabled()) { - logger.trace("Sending blocking " + packet); + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending blocking " + packet); } connection.getTransportConnection().write(buffer, false, false); @@ -460,6 +469,9 @@ public final class ChannelImpl implements Channel { public void setCommandConfirmationHandler(final CommandConfirmationHandler handler) { if (confWindowSize < 0) { final String msg = "You can't set confirmationHandler on a connection with confirmation-window-size < 0." + " Look at the documentation for more information."; + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " " + msg); + } throw new IllegalStateException(msg); } commandConfirmationHandler = handler; @@ -468,7 +480,7 @@ public final class ChannelImpl implements Channel { @Override public void setHandler(final ChannelHandler handler) { if (logger.isTraceEnabled()) { - logger.trace("Setting handler on " + this + " as " + handler); + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Setting handler on " + this + " as " + handler); } this.handler = handler; @@ -502,6 +514,10 @@ public final class ChannelImpl implements Channel { synchronized (connection.getTransferLock()) { connection.removeChannel(id); + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " transferConnection to new RemotingConnectionID=" + (newConnection == null ? "NULL" : newConnection.getID())); + } + // And switch it final CoreRemotingConnection rnewConnection = newConnection; @@ -518,7 +534,7 @@ public final class ChannelImpl implements Channel { public void replayCommands(final int otherLastConfirmedCommandID) { if (resendCache != null) { if (logger.isTraceEnabled()) { - logger.trace("Replaying commands on channelID=" + id); + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Replaying commands on channelID=" + id); } clearUpTo(otherLastConfirmedCommandID); @@ -531,7 +547,7 @@ public final class ChannelImpl implements Channel { @Override public void lock() { if (logger.isTraceEnabled()) { - logger.trace("lock channel " + this); + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " lock channel " + this); } lock.lock(); @@ -545,7 +561,7 @@ public final class ChannelImpl implements Channel { @Override public void unlock() { if (logger.isTraceEnabled()) { - logger.trace("unlock channel " + this); + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " unlock channel " + this); } lock.lock(); @@ -572,7 +588,7 @@ public final class ChannelImpl implements Channel { confirmed.setChannelID(id); if (logger.isTraceEnabled()) { - logger.trace("ChannelImpl::flushConfirmation flushing confirmation " + confirmed); + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::flushConfirmation flushing confirmation " + confirmed); } doWrite(confirmed); @@ -585,7 +601,7 @@ public final class ChannelImpl implements Channel { lastConfirmedCommandID.incrementAndGet(); if (logger.isTraceEnabled()) { - logger.trace("ChannelImpl::confirming packet " + packet + " last commandID=" + lastConfirmedCommandID); + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::confirming packet " + packet + " last commandID=" + lastConfirmedCommandID); } receivedBytes += packet.getPacketSize(); @@ -656,7 +672,7 @@ public final class ChannelImpl implements Channel { resendCache.add(packet); if (logger.isTraceEnabled()) { - logger.trace("ChannelImpl::addResendPacket adding packet " + packet + " stored commandID=" + firstStoredCommandID + " possible commandIDr=" + (firstStoredCommandID + resendCache.size())); + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::addResendPacket adding packet " + packet + " stored commandID=" + firstStoredCommandID + " possible commandIDr=" + (firstStoredCommandID + resendCache.size())); } } @@ -664,9 +680,7 @@ public final class ChannelImpl implements Channel { final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID; if (logger.isTraceEnabled()) { - logger.trace("ChannelImpl::clearUpTo lastReceived commandID=" + lastReceivedCommandID + - " first commandID=" + firstStoredCommandID + - " number to clear " + numberToClear); + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::clearUpTo lastReceived commandID=" + lastReceivedCommandID + " first commandID=" + firstStoredCommandID + " number to clear " + numberToClear); } for (int i = 0; i < numberToClear; i++) { @@ -679,7 +693,7 @@ public final class ChannelImpl implements Channel { } if (logger.isTraceEnabled()) { - logger.trace("ChannelImpl::clearUpTo confirming " + packet + " towards " + commandConfirmationHandler); + logger.trace("RemotingConnectionID=" + connection.getID() + " ChannelImpl::clearUpTo confirming " + packet + " towards " + commandConfirmationHandler); } if (commandConfirmationHandler != null) { commandConfirmationHandler.commandConfirmed(packet); @@ -691,6 +705,6 @@ public final class ChannelImpl implements Channel { @Override public String toString() { - return "Channel[id=" + CHANNEL_ID.idToString(id) + ", handler=" + handler + "]"; + return "Channel[id=" + CHANNEL_ID.idToString(id) + ", RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + ", handler=" + handler + "]"; } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index 937f061da3..480d3dc350 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -131,6 +131,10 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement this.nodeID = nodeID; transportConnection.setProtocolConnection(this); + + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionImpl created: " + this); + } } // RemotingConnection implementation @@ -138,12 +142,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement @Override public String toString() { - return "RemotingConnectionImpl [clientID=" + clientID + - ", nodeID=" + - nodeID + - ", transportConnection=" + - getTransportConnection() + - "]"; + return "RemotingConnectionImpl [ID=" + getID() + ", clientID=" + clientID + ", nodeID=" + nodeID + ", transportConnection=" + getTransportConnection() + "]"; } /** @@ -366,7 +365,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement final Packet packet = packetDecoder.decode(buffer); if (logger.isTraceEnabled()) { - logger.trace("handling packet " + packet); + logger.trace("RemotingConnectionID=" + getID() + " handling packet " + packet); } dataReceived = true; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 3d141a74d9..601bbfc884 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -527,7 +527,7 @@ public class NettyConnection implements Connection { @Override public final String toString() { - return super.toString() + "[local= " + channel.localAddress() + ", remote=" + channel.remoteAddress() + "]"; + return super.toString() + "[ID=" + getID() + ", local= " + channel.localAddress() + ", remote=" + channel.remoteAddress() + "]"; } private void closeSSLAndChannel(SslHandler sslHandler, final Channel channel, boolean inEventLoop) {