ARTEMIS-2226 last consumer connection should close the previous consumer connection

Multiple consumers using the same clientId in the cluster, the last consumer connection should close the previous consumer connection!

ARTEMIS-2226 last consumer connection should close the previous consumer connection

to address apache-rat-plugin:0.12:check

ARTEMIS-2226 last consumer connection should close the previous consumer connection

to address checkstyle

ARTEMIS-2226 last consumer connection should close the previous consumer connection

adjust the code structure

ARTEMIS-2226 last consumer connection should close the previous consumer connection

adjust the code structure

ARTEMIS-2226 last consumer connection should close the previous consumer connection

adjust the code structure

ARTEMIS-2226 last consumer connection should close the previous consumer connection

adjust the code structure

ARTEMIS-2226 last consumer connection should close the previous consumer connection

adjust the code structure

ARTEMIS-2226 last consumer connection should close the previous consumer connection

add javadoc
This commit is contained in:
onlyMIT 2019-01-29 17:58:36 +08:00 committed by Michael Andre Pearce
parent 1c637c1a2e
commit 4484d05cf0
15 changed files with 1112 additions and 191 deletions

View File

@ -84,6 +84,10 @@ public final class ManagementHelper {
public static final SimpleString HDR_MESSAGE_ID = new SimpleString("_AMQ_Message_ID"); public static final SimpleString HDR_MESSAGE_ID = new SimpleString("_AMQ_Message_ID");
public static final SimpleString HDR_PROTOCOL_NAME = new SimpleString("_AMQ_Protocol_Name");
public static final SimpleString HDR_CLIENT_ID = new SimpleString("_AMQ_Client_ID");
// Attributes ---------------------------------------------------- // Attributes ----------------------------------------------------
// Static -------------------------------------------------------- // Static --------------------------------------------------------

View File

@ -162,14 +162,7 @@ public class MQTTConnectionManager {
} }
private MQTTSessionState getSessionState(String clientId) { private MQTTSessionState getSessionState(String clientId) {
/* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */ return session.getProtocolManager().getSessionState(clientId);
MQTTSessionState state = MQTTSession.SESSIONS.get(clientId);
if (state == null) {
state = new MQTTSessionState(clientId);
MQTTSession.SESSIONS.put(clientId, state);
}
return state;
} }
private String validateClientId(String clientId, boolean cleanSession) { private String validateClientId(String clientId, boolean cleanSession) {

View File

@ -18,9 +18,9 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
@ -30,6 +30,9 @@ import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.Notification;
@ -40,11 +43,12 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
/** /**
* MQTTProtocolManager * MQTTProtocolManager
*/ */
class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInterceptor, MQTTConnection> implements NotificationListener { public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInterceptor, MQTTConnection> implements NotificationListener {
private static final List<String> websocketRegistryNames = Arrays.asList("mqtt", "mqttv3.1"); private static final List<String> websocketRegistryNames = Arrays.asList("mqtt", "mqttv3.1");
@ -55,18 +59,53 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<>(); private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<>();
//TODO Read in a list of existing client IDs from stored Sessions. //TODO Read in a list of existing client IDs from stored Sessions.
private Map<String, MQTTConnection> connectedClients = new ConcurrentHashMap<>(); private final Map<String, MQTTConnection> connectedClients;
private final Map<String, MQTTSessionState> sessionStates;
MQTTProtocolManager(ActiveMQServer server, MQTTProtocolManager(ActiveMQServer server,
Map<String, MQTTConnection> connectedClients,
Map<String, MQTTSessionState> sessionStates,
List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) { List<BaseInterceptor> outgoingInterceptors) {
this.server = server; this.server = server;
this.connectedClients = connectedClients;
this.sessionStates = sessionStates;
this.updateInterceptors(incomingInterceptors, outgoingInterceptors); this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
server.getManagementService().addNotificationListener(this);
} }
@Override @Override
public void onNotification(Notification notification) { public void onNotification(Notification notification) {
// TODO handle notifications if (!(notification.getType() instanceof CoreNotificationType))
return;
CoreNotificationType type = (CoreNotificationType) notification.getType();
if (type != CoreNotificationType.SESSION_CREATED)
return;
TypedProperties props = notification.getProperties();
SimpleString protocolName = props.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME);
//Only process SESSION_CREATED notifications for the MQTT protocol
if (protocolName == null || !protocolName.toString().equals(MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME))
return;
int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
//distance > 0 means only processing notifications which are received from other nodes in the cluster
if (distance > 0) {
String clientId = props.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID).toString();
/*
* If there is a connection in the node with the same clientId as the value of the "_AMQ_Client_ID" attribute
* in the SESSION_CREATED notification, you need to close this connection.
* Avoid consumers with the same client ID in the cluster appearing at different nodes at the same time.
*/
MQTTConnection mqttConnection = connectedClients.get(clientId);
if (mqttConnection != null) {
mqttConnection.destroy();
}
}
} }
@Override @Override
@ -201,4 +240,17 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
public MQTTConnection addConnectedClient(String clientId, MQTTConnection connection) { public MQTTConnection addConnectedClient(String clientId, MQTTConnection connection) {
return connectedClients.put(clientId, connection); return connectedClients.put(clientId, connection);
} }
public MQTTSessionState getSessionState(String clientId) {
/* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */
return sessionStates.computeIfAbsent(clientId, MQTTSessionState::new);
}
public MQTTSessionState removeSessionState(String clientId) {
return sessionStates.remove(clientId);
}
public Map<String, MQTTSessionState> getSessionStates() {
return new HashMap<>(sessionStates);
}
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -37,12 +38,15 @@ public class MQTTProtocolManagerFactory extends AbstractProtocolManagerFactory<M
private static final String[] SUPPORTED_PROTOCOLS = {MQTT_PROTOCOL_NAME}; private static final String[] SUPPORTED_PROTOCOLS = {MQTT_PROTOCOL_NAME};
private final Map<String, MQTTConnection> connectedClients = new ConcurrentHashMap<>();
private final Map<String, MQTTSessionState> sessionStates = new ConcurrentHashMap<>();
@Override @Override
public ProtocolManager createProtocolManager(ActiveMQServer server, public ProtocolManager createProtocolManager(ActiveMQServer server,
final Map<String, Object> parameters, final Map<String, Object> parameters,
List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) throws Exception { List<BaseInterceptor> outgoingInterceptors) throws Exception {
return BeanSupport.setData(new MQTTProtocolManager(server, incomingInterceptors, outgoingInterceptors), parameters); return BeanSupport.setData(new MQTTProtocolManager(server, connectedClients, sessionStates, incomingInterceptors, outgoingInterceptors), parameters);
} }
@Override @Override

View File

@ -17,10 +17,7 @@
package org.apache.activemq.artemis.core.protocol.mqtt; package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
@ -30,8 +27,6 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
public class MQTTSession { public class MQTTSession {
static Map<String, MQTTSessionState> SESSIONS = new ConcurrentHashMap<>();
private final String id = UUID.randomUUID().toString(); private final String id = UUID.randomUUID().toString();
private MQTTProtocolHandler protocolHandler; private MQTTProtocolHandler protocolHandler;
@ -108,7 +103,7 @@ public class MQTTSession {
if (isClean()) { if (isClean()) {
clean(); clean();
SESSIONS.remove(connection.getClientID()); protocolManager.removeSessionState(connection.getClientID());
} }
} }
stopped = true; stopped = true;
@ -201,7 +196,4 @@ public class MQTTSession {
return coreMessageObjectPools; return coreMessageObjectPools;
} }
public static Map<String, MQTTSessionState> getSessions() {
return new HashMap<>(SESSIONS);
}
} }

View File

@ -59,6 +59,7 @@ import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.BridgeControl; import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.DivertControl; import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.Parameter; import org.apache.activemq.artemis.api.core.management.Parameter;
import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.client.impl.Topology; import org.apache.activemq.artemis.core.client.impl.Topology;
@ -2967,7 +2968,16 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
if (!(notification.getType() instanceof CoreNotificationType)) if (!(notification.getType() instanceof CoreNotificationType))
return; return;
CoreNotificationType type = (CoreNotificationType) notification.getType(); CoreNotificationType type = (CoreNotificationType) notification.getType();
TypedProperties prop = notification.getProperties(); if (type == CoreNotificationType.SESSION_CREATED) {
TypedProperties props = notification.getProperties();
/*
* If the SESSION_CREATED notification is received from another node in the cluster, no broadcast call is made.
* To keep the original logic to avoid calling the broadcast multiple times for the same SESSION_CREATED notification in the cluster.
*/
if (props.getIntProperty(ManagementHelper.HDR_DISTANCE) > 0) {
return;
}
}
this.broadcaster.sendNotification(new Notification(type.toString(), this, notifSeq.incrementAndGet(), notification.toString())); this.broadcaster.sendNotification(new Notification(type.toString(), this, notifSeq.incrementAndGet(), notification.toString()));
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.remoting.impl; package org.apache.activemq.artemis.core.remoting.impl;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -43,4 +44,8 @@ public abstract class AbstractAcceptor implements Acceptor {
} }
} }
public Map<String, ProtocolManager> getProtocolMap() {
return Collections.unmodifiableMap(protocolMap);
}
} }

View File

@ -233,6 +233,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
" AND " + " AND " +
ManagementHelper.HDR_NOTIFICATION_TYPE + ManagementHelper.HDR_NOTIFICATION_TYPE +
" IN ('" + " IN ('" +
CoreNotificationType.SESSION_CREATED +
"','" +
CoreNotificationType.BINDING_ADDED + CoreNotificationType.BINDING_ADDED +
"','" + "','" +
CoreNotificationType.BINDING_REMOVED + CoreNotificationType.BINDING_REMOVED +
@ -252,6 +254,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
flowRecord.getMaxHops() + flowRecord.getMaxHops() +
" AND (" + " AND (" +
createSelectorFromAddress(appendIgnoresToFilter(flowRecord.getAddress())) + createSelectorFromAddress(appendIgnoresToFilter(flowRecord.getAddress())) +
") AND (" +
createPermissiveManagementNotificationToFilter() +
")"); ")");
sessionConsumer.createTemporaryQueue(managementNotificationAddress, notifQueueName, filter); sessionConsumer.createTemporaryQueue(managementNotificationAddress, notifQueueName, filter);
@ -351,10 +355,20 @@ public class ClusterConnectionBridge extends BridgeImpl {
} }
filterString += "!" + storeAndForwardPrefix; filterString += "!" + storeAndForwardPrefix;
filterString += ",!" + managementAddress; filterString += ",!" + managementAddress;
filterString += ",!" + managementNotificationAddress;
return filterString; return filterString;
} }
/**
* Create a filter rule,in addition to SESSION_CREATED notifications, all other notifications using managementNotificationAddress
* as the routing address will be filtered.
* @return
*/
private String createPermissiveManagementNotificationToFilter() {
StringBuilder filterBuilder = new StringBuilder(ManagementHelper.HDR_NOTIFICATION_TYPE).append(" = '")
.append(CoreNotificationType.SESSION_CREATED).append("' OR (").append(ManagementHelper.HDR_ADDRESS)
.append(" NOT LIKE '").append(managementNotificationAddress).append("%')");
return filterBuilder.toString();
}
@Override @Override
protected void nodeUP(TopologyMember member, boolean last) { protected void nodeUP(TopologyMember member, boolean last) {

View File

@ -1078,6 +1078,10 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
doUnProposalReceived(message); doUnProposalReceived(message);
break; break;
} }
case SESSION_CREATED: {
doSessionCreated(message);
break;
}
default: { default: {
throw ActiveMQMessageBundle.BUNDLE.invalidType(ntype); throw ActiveMQMessageBundle.BUNDLE.invalidType(ntype);
} }
@ -1303,6 +1307,19 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
binding.disconnect(); binding.disconnect();
} }
private synchronized void doSessionCreated(final ClientMessage message) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace(ClusterConnectionImpl.this + " session created " + message);
}
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, message.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME));
props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS));
props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, message.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID));
props.putSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME, message.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME));
props.putIntProperty(ManagementHelper.HDR_DISTANCE, message.getIntProperty(ManagementHelper.HDR_DISTANCE) + 1);
managementService.sendNotification(new Notification(null, CoreNotificationType.SESSION_CREATED, props));
}
private synchronized void doConsumerCreated(final ClientMessage message) throws Exception { private synchronized void doConsumerCreated(final ClientMessage message) throws Exception {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(ClusterConnectionImpl.this + " Consumer created " + message); logger.trace(ClusterConnectionImpl.this + " Consumer created " + message);

View File

@ -272,6 +272,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (!xa) { if (!xa) {
tx = newTransaction(); tx = newTransaction();
} }
//When the ServerSessionImpl initialization is complete, need to create and send a SESSION_CREATED notification.
sendSessionNotification(CoreNotificationType.SESSION_CREATED);
} }
// ServerSession implementation --------------------------------------------------------------------------- // ServerSession implementation ---------------------------------------------------------------------------
@ -422,6 +424,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
} }
closed = true; closed = true;
//When the ServerSessionImpl is closed, need to create and send a SESSION_CLOSED notification.
sendSessionNotification(CoreNotificationType.SESSION_CLOSED);
if (server.hasBrokerSessionPlugins()) { if (server.hasBrokerSessionPlugins()) {
server.callBrokerSessionPlugins(plugin -> plugin.afterCloseSession(this, failed)); server.callBrokerSessionPlugins(plugin -> plugin.afterCloseSession(this, failed));
@ -429,6 +433,19 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
} }
} }
private void sendSessionNotification(final CoreNotificationType type) throws Exception {
final TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, SimpleString.toSimpleString(this.getConnectionID().toString()));
props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(this.getUsername()));
props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(this.getName()));
props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, SimpleString.toSimpleString(this.remotingConnection.getClientID()));
props.putSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME, SimpleString.toSimpleString(this.remotingConnection.getProtocolName()));
props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, managementService.getManagementNotificationAddress());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
managementService.sendNotification(new Notification(null, type, props));
}
private void securityCheck(SimpleString address, CheckType checkType, SecurityAuth auth) throws Exception { private void securityCheck(SimpleString address, CheckType checkType, SecurityAuth auth) throws Exception {
if (securityEnabled) { if (securityEnabled) {
securityStore.check(address, checkType, auth); securityStore.check(address, checkType, auth);

View File

@ -26,7 +26,6 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.Notification;
@ -43,13 +42,11 @@ public class NotificationActiveMQServerPlugin implements ActiveMQServerPlugin {
private static final Logger logger = Logger.getLogger(NotificationActiveMQServerPlugin.class); private static final Logger logger = Logger.getLogger(NotificationActiveMQServerPlugin.class);
public static final String SEND_CONNECTION_NOTIFICATIONS = "SEND_CONNECTION_NOTIFICATIONS"; public static final String SEND_CONNECTION_NOTIFICATIONS = "SEND_CONNECTION_NOTIFICATIONS";
public static final String SEND_SESSION_NOTIFICATIONS = "SEND_SESSION_NOTIFICATIONS";
public static final String SEND_ADDRESS_NOTIFICATIONS = "SEND_ADDRESS_NOTIFICATIONS"; public static final String SEND_ADDRESS_NOTIFICATIONS = "SEND_ADDRESS_NOTIFICATIONS";
public static final String SEND_DELIVERED_NOTIFICATIONS = "SEND_DELIVERED_NOTIFICATIONS"; public static final String SEND_DELIVERED_NOTIFICATIONS = "SEND_DELIVERED_NOTIFICATIONS";
public static final String SEND_EXPIRED_NOTIFICATIONS = "SEND_EXPIRED_NOTIFICATIONS"; public static final String SEND_EXPIRED_NOTIFICATIONS = "SEND_EXPIRED_NOTIFICATIONS";
private boolean sendConnectionNotifications; private boolean sendConnectionNotifications;
private boolean sendSessionNotifications;
private boolean sendAddressNotifications; private boolean sendAddressNotifications;
private boolean sendDeliveredNotifications; private boolean sendDeliveredNotifications;
private boolean sendExpiredNotifications; private boolean sendExpiredNotifications;
@ -66,8 +63,6 @@ public class NotificationActiveMQServerPlugin implements ActiveMQServerPlugin {
public void init(Map<String, String> properties) { public void init(Map<String, String> properties) {
sendConnectionNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_CONNECTION_NOTIFICATIONS, sendConnectionNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_CONNECTION_NOTIFICATIONS,
Boolean.FALSE.toString())); Boolean.FALSE.toString()));
sendSessionNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_SESSION_NOTIFICATIONS,
Boolean.FALSE.toString()));
sendAddressNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_ADDRESS_NOTIFICATIONS, sendAddressNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_ADDRESS_NOTIFICATIONS,
Boolean.FALSE.toString())); Boolean.FALSE.toString()));
sendDeliveredNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_DELIVERED_NOTIFICATIONS, sendDeliveredNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_DELIVERED_NOTIFICATIONS,
@ -96,16 +91,6 @@ public class NotificationActiveMQServerPlugin implements ActiveMQServerPlugin {
sendConnectionNotification(connection, CoreNotificationType.CONNECTION_DESTROYED); sendConnectionNotification(connection, CoreNotificationType.CONNECTION_DESTROYED);
} }
@Override
public void afterCreateSession(ServerSession session) throws ActiveMQException {
sendSessionNotification(session, CoreNotificationType.SESSION_CREATED);
}
@Override
public void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
sendSessionNotification(session, CoreNotificationType.SESSION_CLOSED);
}
@Override @Override
public void afterAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException { public void afterAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException {
sendAddressNotification(addressInfo, CoreNotificationType.ADDRESS_ADDED); sendAddressNotification(addressInfo, CoreNotificationType.ADDRESS_ADDED);
@ -196,23 +181,6 @@ public class NotificationActiveMQServerPlugin implements ActiveMQServerPlugin {
} }
} }
private void sendSessionNotification(final ServerSession session, final CoreNotificationType type) {
final ManagementService managementService = getManagementService();
if (managementService != null && sendSessionNotifications) {
try {
final TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, SimpleString.toSimpleString(session.getConnectionID().toString()));
props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(session.getUsername()));
props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(session.getName()));
managementService.sendNotification(new Notification(null, type, props));
} catch (Exception e) {
logger.warn("Error sending notification: " + type, e.getMessage(), e);
}
}
}
/** /**
* @return the sendConnectionNotifications * @return the sendConnectionNotifications
*/ */
@ -227,20 +195,6 @@ public class NotificationActiveMQServerPlugin implements ActiveMQServerPlugin {
this.sendConnectionNotifications = sendConnectionNotifications; this.sendConnectionNotifications = sendConnectionNotifications;
} }
/**
* @return the sendSessionNotifications
*/
public boolean isSendSessionNotifications() {
return sendSessionNotifications;
}
/**
* @param sendSessionNotifications the sendSessionNotifications to set
*/
public void setSendSessionNotifications(boolean sendSessionNotifications) {
this.sendSessionNotifications = sendSessionNotifications;
}
/** /**
* @return the sendDeliveredNotifications * @return the sendDeliveredNotifications
*/ */

View File

@ -362,7 +362,6 @@ public class NotificationTest extends ActiveMQTestBase {
NotificationActiveMQServerPlugin notificationPlugin = new NotificationActiveMQServerPlugin(); NotificationActiveMQServerPlugin notificationPlugin = new NotificationActiveMQServerPlugin();
notificationPlugin.setSendAddressNotifications(true); notificationPlugin.setSendAddressNotifications(true);
notificationPlugin.setSendConnectionNotifications(true); notificationPlugin.setSendConnectionNotifications(true);
notificationPlugin.setSendSessionNotifications(true);
notificationPlugin.setSendDeliveredNotifications(true); notificationPlugin.setSendDeliveredNotifications(true);
notificationPlugin.setSendExpiredNotifications(true); notificationPlugin.setSendExpiredNotifications(true);

View File

@ -1120,7 +1120,7 @@ public class MQTTTest extends MQTTTestSupport {
notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
notClean.disconnect(); notClean.disconnect();
assertEquals(1, MQTTSession.getSessions().size()); assertEquals(1, getSessions().size());
// MUST receive message from existing subscription from previous not clean session // MUST receive message from existing subscription from previous not clean session
notClean = mqttNotClean.blockingConnection(); notClean = mqttNotClean.blockingConnection();
@ -1132,7 +1132,7 @@ public class MQTTTest extends MQTTTestSupport {
notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
notClean.disconnect(); notClean.disconnect();
assertEquals(1, MQTTSession.getSessions().size()); assertEquals(1, getSessions().size());
// MUST NOT receive message from previous not clean session as existing subscription should be gone // MUST NOT receive message from previous not clean session as existing subscription should be gone
final MQTT mqttClean = createMQTTConnection(CLIENTID, true); final MQTT mqttClean = createMQTTConnection(CLIENTID, true);
@ -1144,7 +1144,7 @@ public class MQTTTest extends MQTTTestSupport {
clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
clean.disconnect(); clean.disconnect();
assertEquals(0, MQTTSession.getSessions().size()); assertEquals(0, getSessions().size());
// MUST NOT receive message from previous clean session as existing subscription should be gone // MUST NOT receive message from previous clean session as existing subscription should be gone
notClean = mqttNotClean.blockingConnection(); notClean = mqttNotClean.blockingConnection();
@ -1153,7 +1153,7 @@ public class MQTTTest extends MQTTTestSupport {
assertNull(msg); assertNull(msg);
notClean.disconnect(); notClean.disconnect();
assertEquals(1, MQTTSession.getSessions().size()); assertEquals(1, getSessions().size());
} }
@Test(timeout = 60 * 1000) @Test(timeout = 60 * 1000)
@ -1167,7 +1167,7 @@ public class MQTTTest extends MQTTTestSupport {
notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
notClean.disconnect(); notClean.disconnect();
assertEquals(1, MQTTSession.getSessions().size()); assertEquals(1, getSessions().size());
// MUST NOT receive message from previous not clean session even when creating a new subscription // MUST NOT receive message from previous not clean session even when creating a new subscription
final MQTT mqttClean = createMQTTConnection(CLIENTID, true); final MQTT mqttClean = createMQTTConnection(CLIENTID, true);
@ -1179,7 +1179,7 @@ public class MQTTTest extends MQTTTestSupport {
clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
clean.disconnect(); clean.disconnect();
assertEquals(0, MQTTSession.getSessions().size()); assertEquals(0, getSessions().size());
} }
@Test(timeout = 60 * 1000) @Test(timeout = 60 * 1000)

View File

@ -28,6 +28,7 @@ import java.security.ProtectionDomain;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.security.cert.CertificateException; import java.security.cert.CertificateException;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
@ -42,13 +43,18 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.MQTT;
@ -366,6 +372,18 @@ public class MQTTTestSupport extends ActiveMQTestBase {
return mqtt; return mqtt;
} }
public Map<String, MQTTSessionState> getSessions() {
Acceptor acceptor = server.getRemotingService().getAcceptor("MQTT");
if (acceptor instanceof AbstractAcceptor) {
ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get("MQTT");
if (protocolManager instanceof MQTTProtocolManager) {
return ((MQTTProtocolManager) protocolManager).getSessionStates();
}
}
return Collections.emptyMap();
}
private MQTT createMQTTSslConnection(String clientId, boolean clean) throws Exception { private MQTT createMQTTSslConnection(String clientId, boolean clean) throws Exception {
MQTT mqtt = new MQTT(); MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1); mqtt.setConnectAttemptsMax(1);