diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index ea007b6691..bf6788f79c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -130,8 +130,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement this.nodeID = nodeID; - transportConnection.setProtocolConnection(this); - logger.trace("RemotingConnectionImpl created: {}", this); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java index 4351f77b05..48199caac4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java @@ -55,6 +55,7 @@ public abstract class AbstractRemotingConnection implements RemotingConnection { public AbstractRemotingConnection(final Connection transportConnection, final Executor executor) { this.transportConnection = transportConnection; + this.transportConnection.setProtocolConnection(this); this.executor = executor; this.creationTime = System.currentTimeMillis(); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java index 69740abff7..9ceea15655 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java @@ -46,7 +46,6 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection super(transportConnection, connectionExecutor); this.manager = manager; this.amqpConnection = amqpConnection; - transportConnection.setProtocolConnection(this); } public AMQPConnectionContext getAmqpConnection() { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java index e136255664..06f31122ef 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java @@ -42,7 +42,6 @@ public class MQTTConnection extends AbstractRemotingConnection { public MQTTConnection(Connection transportConnection) throws Exception { super(transportConnection, null); this.destroyed = false; - transportConnection.setProtocolConnection(this); } @Override diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 3b3b336476..dc35b6b1ea 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -232,7 +232,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se this.outWireFormat = wf.copy(); this.useKeepAlive = openWireProtocolManager.isUseKeepAlive(); this.maxInactivityDuration = openWireProtocolManager.getMaxInactivityDuration(); - this.transportConnection.setProtocolConnection(this); this.actorThresholdBytes = actorThresholdBytes; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java index 8d489c0423..87cd1d2869 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java @@ -243,6 +243,15 @@ public class ProtocolHandler { ctx.flush(); } + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + try { + ActiveMQServerLogger.LOGGER.failureDuringProtocolHandshake(ctx.channel().localAddress(), ctx.channel().remoteAddress(), cause); + } finally { + ctx.close(); + } + } + private boolean isHttp(int magic1, int magic2) { return magic1 == 'G' && magic2 == 'E' || // GET magic1 == 'P' && magic2 == 'O' || // POST diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 5ace4fe18d..20d777bd43 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -582,7 +582,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif public void addConnectionEntry(Connection connection, ConnectionEntry entry) { connections.put(connection.getID(), entry); if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.createdConnection(connection.getProtocolConnection().getProtocolName(), connection.getID(), connection.getRemoteAddress()); + AuditLogger.createdConnection(connection.getProtocolConnection() == null ? null : connection.getProtocolConnection().getProtocolName(), connection.getID(), connection.getRemoteAddress()); } if (logger.isDebugEnabled()) { logger.debug("Adding connection {}, we now have {}", connection.getID(), connections.size()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index bd3d45bfde..aad5fd93f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1582,4 +1582,6 @@ public interface ActiveMQServerLogger { @LogMessage(id = 224125, value = "Address {} has page-limit-bytes={} and page-limit-messages={} but no page-full-policy set. Page full configuration being ignored on this address", level = LogMessage.Level.WARN) void noPagefullPolicySet(Object address, Object limitBytes, Object limitMessages); + @LogMessage(id = 224126, value = "Failure during protocol handshake on connection to {} from {}", level = LogMessage.Level.ERROR) + void failureDuringProtocolHandshake(SocketAddress localAddress, SocketAddress remoteAddress, Throwable e); } diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerResourceTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerResourceTest.java index d67c6b91ea..653fbcc837 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerResourceTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerResourceTest.java @@ -16,10 +16,14 @@ */ package org.apache.activemq.artemis.tests.smoke.logging; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Session; import javax.management.MBeanServerConnection; import javax.management.MBeanServerInvocationHandler; import javax.management.openmbean.CompositeData; import javax.management.remote.JMXConnector; +import java.net.URI; import java.util.HashMap; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; @@ -32,6 +36,12 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; import org.junit.Test; public class AuditLoggerResourceTest extends AuditLoggerTestBase { @@ -85,4 +95,54 @@ public class AuditLoggerResourceTest extends AuditLoggerTestBase { jmxConnector.close(); } } + + @Test + public void testCoreConnectionAuditLog() throws Exception { + testConnectionAuditLog("CORE"); + } + + @Test + public void testAMQPConnectionAuditLog() throws Exception { + testConnectionAuditLog("AMQP"); + } + + @Test + public void testOpenWireConnectionAuditLog() throws Exception { + testConnectionAuditLog("OPENWIRE"); + } + + private void testConnectionAuditLog(String protocol) throws Exception { + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + Connection connection = factory.createConnection(); + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + checkAuditLogRecord(true, "AMQ601767: " + protocol + " connection"); + s.close(); + connection.close(); + checkAuditLogRecord(true, "AMQ601768: " + protocol + " connection"); + } + + @Test + public void testMQTTConnectionAuditLog() throws Exception { + MQTT mqtt = new MQTT(); + mqtt.setConnectAttemptsMax(1); + mqtt.setReconnectAttemptsMax(0); + mqtt.setVersion("3.1.1"); + mqtt.setClientId(RandomUtil.randomString()); + mqtt.setCleanSession(true); + mqtt.setHost("localhost", 1883); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + connection.disconnect(); + checkAuditLogRecord(true, "AMQ601767: MQTT connection"); + checkAuditLogRecord(true, "AMQ601768: MQTT connection"); + } + + @Test + public void testStompConnectionAuditLog() throws Exception { + StompClientConnection connection = StompClientConnectionFactory.createClientConnection(new URI("tcp://localhost:61613")); + connection.connect(); + connection.disconnect(); + checkAuditLogRecord(true, "AMQ601767: STOMP connection"); + checkAuditLogRecord(true, "AMQ601768: STOMP connection"); + } }