This commit is contained in:
Clebert Suconic 2017-11-28 15:56:42 -05:00
commit e714af7ab9
3 changed files with 38 additions and 25 deletions

View File

@ -271,7 +271,7 @@ public final class ChannelImpl implements Channel {
packet.setChannelID(id);
if (logger.isTraceEnabled()) {
logger.trace("Sending packet nonblocking " + packet + " on channelID=" + id);
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
}
ActiveMQBuffer buffer = packet.encode(connection);
@ -280,7 +280,7 @@ public final class ChannelImpl implements Channel {
try {
if (failingOver) {
waitForFailOver("timed-out waiting for fail-over condition on non-blocking send");
waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
}
// Sanity check
@ -296,7 +296,7 @@ public final class ChannelImpl implements Channel {
}
if (logger.isTraceEnabled()) {
logger.trace("Writing buffer for channelID=" + id);
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
}
checkReconnectID(reconnectID);
@ -331,15 +331,24 @@ public final class ChannelImpl implements Channel {
String interceptionResult = invokeInterceptors(packet, interceptors, connection);
if (interceptionResult != null) {
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " interceptionResult=" + interceptionResult);
}
// if we don't throw an exception here the client might not unblock
throw ActiveMQClientMessageBundle.BUNDLE.interceptorRejectedPacket(interceptionResult);
}
if (closed) {
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " closed.");
}
throw ActiveMQClientMessageBundle.BUNDLE.connectionDestroyed();
}
if (connection.getBlockingCallTimeout() == -1) {
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Cannot do a blocking call timeout on a server side connection");
}
throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection");
}
@ -354,7 +363,7 @@ public final class ChannelImpl implements Channel {
try {
if (failingOver) {
waitForFailOver("timed-out waiting for fail-over condition on blocking send");
waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on blocking send");
}
response = null;
@ -366,7 +375,7 @@ public final class ChannelImpl implements Channel {
checkReconnectID(reconnectID);
if (logger.isTraceEnabled()) {
logger.trace("Sending blocking " + packet);
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending blocking " + packet);
}
connection.getTransportConnection().write(buffer, false, false);
@ -460,6 +469,9 @@ public final class ChannelImpl implements Channel {
public void setCommandConfirmationHandler(final CommandConfirmationHandler handler) {
if (confWindowSize < 0) {
final String msg = "You can't set confirmationHandler on a connection with confirmation-window-size < 0." + " Look at the documentation for more information.";
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " " + msg);
}
throw new IllegalStateException(msg);
}
commandConfirmationHandler = handler;
@ -468,7 +480,7 @@ public final class ChannelImpl implements Channel {
@Override
public void setHandler(final ChannelHandler handler) {
if (logger.isTraceEnabled()) {
logger.trace("Setting handler on " + this + " as " + handler);
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Setting handler on " + this + " as " + handler);
}
this.handler = handler;
@ -502,6 +514,10 @@ public final class ChannelImpl implements Channel {
synchronized (connection.getTransferLock()) {
connection.removeChannel(id);
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " transferConnection to new RemotingConnectionID=" + (newConnection == null ? "NULL" : newConnection.getID()));
}
// And switch it
final CoreRemotingConnection rnewConnection = newConnection;
@ -518,7 +534,7 @@ public final class ChannelImpl implements Channel {
public void replayCommands(final int otherLastConfirmedCommandID) {
if (resendCache != null) {
if (logger.isTraceEnabled()) {
logger.trace("Replaying commands on channelID=" + id);
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Replaying commands on channelID=" + id);
}
clearUpTo(otherLastConfirmedCommandID);
@ -531,7 +547,7 @@ public final class ChannelImpl implements Channel {
@Override
public void lock() {
if (logger.isTraceEnabled()) {
logger.trace("lock channel " + this);
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " lock channel " + this);
}
lock.lock();
@ -545,7 +561,7 @@ public final class ChannelImpl implements Channel {
@Override
public void unlock() {
if (logger.isTraceEnabled()) {
logger.trace("unlock channel " + this);
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " unlock channel " + this);
}
lock.lock();
@ -572,7 +588,7 @@ public final class ChannelImpl implements Channel {
confirmed.setChannelID(id);
if (logger.isTraceEnabled()) {
logger.trace("ChannelImpl::flushConfirmation flushing confirmation " + confirmed);
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::flushConfirmation flushing confirmation " + confirmed);
}
doWrite(confirmed);
@ -585,7 +601,7 @@ public final class ChannelImpl implements Channel {
lastConfirmedCommandID.incrementAndGet();
if (logger.isTraceEnabled()) {
logger.trace("ChannelImpl::confirming packet " + packet + " last commandID=" + lastConfirmedCommandID);
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::confirming packet " + packet + " last commandID=" + lastConfirmedCommandID);
}
receivedBytes += packet.getPacketSize();
@ -656,7 +672,7 @@ public final class ChannelImpl implements Channel {
resendCache.add(packet);
if (logger.isTraceEnabled()) {
logger.trace("ChannelImpl::addResendPacket adding packet " + packet + " stored commandID=" + firstStoredCommandID + " possible commandIDr=" + (firstStoredCommandID + resendCache.size()));
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::addResendPacket adding packet " + packet + " stored commandID=" + firstStoredCommandID + " possible commandIDr=" + (firstStoredCommandID + resendCache.size()));
}
}
@ -664,9 +680,7 @@ public final class ChannelImpl implements Channel {
final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
if (logger.isTraceEnabled()) {
logger.trace("ChannelImpl::clearUpTo lastReceived commandID=" + lastReceivedCommandID +
" first commandID=" + firstStoredCommandID +
" number to clear " + numberToClear);
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::clearUpTo lastReceived commandID=" + lastReceivedCommandID + " first commandID=" + firstStoredCommandID + " number to clear " + numberToClear);
}
for (int i = 0; i < numberToClear; i++) {
@ -679,7 +693,7 @@ public final class ChannelImpl implements Channel {
}
if (logger.isTraceEnabled()) {
logger.trace("ChannelImpl::clearUpTo confirming " + packet + " towards " + commandConfirmationHandler);
logger.trace("RemotingConnectionID=" + connection.getID() + " ChannelImpl::clearUpTo confirming " + packet + " towards " + commandConfirmationHandler);
}
if (commandConfirmationHandler != null) {
commandConfirmationHandler.commandConfirmed(packet);
@ -691,6 +705,6 @@ public final class ChannelImpl implements Channel {
@Override
public String toString() {
return "Channel[id=" + CHANNEL_ID.idToString(id) + ", handler=" + handler + "]";
return "Channel[id=" + CHANNEL_ID.idToString(id) + ", RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + ", handler=" + handler + "]";
}
}

View File

@ -131,6 +131,10 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
this.nodeID = nodeID;
transportConnection.setProtocolConnection(this);
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionImpl created: " + this);
}
}
// RemotingConnection implementation
@ -138,12 +142,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
@Override
public String toString() {
return "RemotingConnectionImpl [clientID=" + clientID +
", nodeID=" +
nodeID +
", transportConnection=" +
getTransportConnection() +
"]";
return "RemotingConnectionImpl [ID=" + getID() + ", clientID=" + clientID + ", nodeID=" + nodeID + ", transportConnection=" + getTransportConnection() + "]";
}
/**
@ -366,7 +365,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
final Packet packet = packetDecoder.decode(buffer);
if (logger.isTraceEnabled()) {
logger.trace("handling packet " + packet);
logger.trace("RemotingConnectionID=" + getID() + " handling packet " + packet);
}
dataReceived = true;

View File

@ -527,7 +527,7 @@ public class NettyConnection implements Connection {
@Override
public final String toString() {
return super.toString() + "[local= " + channel.localAddress() + ", remote=" + channel.remoteAddress() + "]";
return super.toString() + "[ID=" + getID() + ", local= " + channel.localAddress() + ", remote=" + channel.remoteAddress() + "]";
}
private void closeSSLAndChannel(SslHandler sslHandler, final Channel channel, boolean inEventLoop) {