ARTEMIS-4338 STOMP inoperable w/resource audit logging

When resource audit logging is enabled STOMP is completely inoperable
due to an NPE during the protocol handshake. Unfortunately the failure
is completely silent. There are no logs to indicate a problem.

This commit fixes this problem via the following changes:
 - Mitigate the original NPE via a check for null
 - Move the logic necessary to set the "protocol connection" on the
   "transport connection" to a class shared by all implementations.
 - Add exception handling to log failures like this in the future.
 - Add tests to ensure the audit logging is correct.
This commit is contained in:
Justin Bertram 2023-06-30 12:55:31 -05:00 committed by Robbie Gemmell
parent d5213e66c1
commit 2d1a8661fd
9 changed files with 73 additions and 6 deletions

View File

@ -130,8 +130,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
this.nodeID = nodeID;
transportConnection.setProtocolConnection(this);
logger.trace("RemotingConnectionImpl created: {}", this);
}

View File

@ -55,6 +55,7 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
public AbstractRemotingConnection(final Connection transportConnection, final Executor executor) {
this.transportConnection = transportConnection;
this.transportConnection.setProtocolConnection(this);
this.executor = executor;
this.creationTime = System.currentTimeMillis();
}

View File

@ -46,7 +46,6 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
super(transportConnection, connectionExecutor);
this.manager = manager;
this.amqpConnection = amqpConnection;
transportConnection.setProtocolConnection(this);
}
public AMQPConnectionContext getAmqpConnection() {

View File

@ -42,7 +42,6 @@ public class MQTTConnection extends AbstractRemotingConnection {
public MQTTConnection(Connection transportConnection) throws Exception {
super(transportConnection, null);
this.destroyed = false;
transportConnection.setProtocolConnection(this);
}
@Override

View File

@ -232,7 +232,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
this.outWireFormat = wf.copy();
this.useKeepAlive = openWireProtocolManager.isUseKeepAlive();
this.maxInactivityDuration = openWireProtocolManager.getMaxInactivityDuration();
this.transportConnection.setProtocolConnection(this);
this.actorThresholdBytes = actorThresholdBytes;
}

View File

@ -243,6 +243,15 @@ public class ProtocolHandler {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
try {
ActiveMQServerLogger.LOGGER.failureDuringProtocolHandshake(ctx.channel().localAddress(), ctx.channel().remoteAddress(), cause);
} finally {
ctx.close();
}
}
private boolean isHttp(int magic1, int magic2) {
return magic1 == 'G' && magic2 == 'E' || // GET
magic1 == 'P' && magic2 == 'O' || // POST

View File

@ -582,7 +582,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
public void addConnectionEntry(Connection connection, ConnectionEntry entry) {
connections.put(connection.getID(), entry);
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.createdConnection(connection.getProtocolConnection().getProtocolName(), connection.getID(), connection.getRemoteAddress());
AuditLogger.createdConnection(connection.getProtocolConnection() == null ? null : connection.getProtocolConnection().getProtocolName(), connection.getID(), connection.getRemoteAddress());
}
if (logger.isDebugEnabled()) {
logger.debug("Adding connection {}, we now have {}", connection.getID(), connections.size());

View File

@ -1582,4 +1582,6 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224125, value = "Address {} has page-limit-bytes={} and page-limit-messages={} but no page-full-policy set. Page full configuration being ignored on this address", level = LogMessage.Level.WARN)
void noPagefullPolicySet(Object address, Object limitBytes, Object limitMessages);
@LogMessage(id = 224126, value = "Failure during protocol handshake on connection to {} from {}", level = LogMessage.Level.ERROR)
void failureDuringProtocolHandshake(SocketAddress localAddress, SocketAddress remoteAddress, Throwable e);
}

View File

@ -16,10 +16,14 @@
*/
package org.apache.activemq.artemis.tests.smoke.logging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.openmbean.CompositeData;
import javax.management.remote.JMXConnector;
import java.net.URI;
import java.util.HashMap;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@ -32,6 +36,12 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.junit.Test;
public class AuditLoggerResourceTest extends AuditLoggerTestBase {
@ -85,4 +95,54 @@ public class AuditLoggerResourceTest extends AuditLoggerTestBase {
jmxConnector.close();
}
}
@Test
public void testCoreConnectionAuditLog() throws Exception {
testConnectionAuditLog("CORE");
}
@Test
public void testAMQPConnectionAuditLog() throws Exception {
testConnectionAuditLog("AMQP");
}
@Test
public void testOpenWireConnectionAuditLog() throws Exception {
testConnectionAuditLog("OPENWIRE");
}
private void testConnectionAuditLog(String protocol) throws Exception {
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
Connection connection = factory.createConnection();
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
checkAuditLogRecord(true, "AMQ601767: " + protocol + " connection");
s.close();
connection.close();
checkAuditLogRecord(true, "AMQ601768: " + protocol + " connection");
}
@Test
public void testMQTTConnectionAuditLog() throws Exception {
MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1);
mqtt.setReconnectAttemptsMax(0);
mqtt.setVersion("3.1.1");
mqtt.setClientId(RandomUtil.randomString());
mqtt.setCleanSession(true);
mqtt.setHost("localhost", 1883);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.disconnect();
checkAuditLogRecord(true, "AMQ601767: MQTT connection");
checkAuditLogRecord(true, "AMQ601768: MQTT connection");
}
@Test
public void testStompConnectionAuditLog() throws Exception {
StompClientConnection connection = StompClientConnectionFactory.createClientConnection(new URI("tcp://localhost:61613"));
connection.connect();
connection.disconnect();
checkAuditLogRecord(true, "AMQ601767: STOMP connection");
checkAuditLogRecord(true, "AMQ601768: STOMP connection");
}
}