ARTEMIS-1895 - Add duplicate metadata failure callback to ActiveMQServerPlugin

Add a callback on duplicate metadata which will allow extra
functionality to be added.
This commit is contained in:
Christopher L. Shannon (cshannon) 2018-05-30 08:50:05 -04:00 committed by Clebert Suconic
parent 2baf377568
commit 40ade11981
4 changed files with 77 additions and 11 deletions

View File

@ -16,6 +16,14 @@
*/ */
package org.apache.activemq.artemis.jms.client; package org.apache.activemq.artemis.jms.client;
import java.lang.ref.WeakReference;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.jms.ConnectionConsumer; import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData; import javax.jms.ConnectionMetaData;
import javax.jms.Destination; import javax.jms.Destination;
@ -32,13 +40,6 @@ import javax.jms.Session;
import javax.jms.Topic; import javax.jms.Topic;
import javax.jms.TopicConnection; import javax.jms.TopicConnection;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import java.lang.ref.WeakReference;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -242,10 +243,9 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
throw new IllegalStateException("setClientID can only be called directly after the connection is created"); throw new IllegalStateException("setClientID can only be called directly after the connection is created");
} }
validateClientID(initialSession, clientID);
this.clientID = clientID;
try { try {
validateClientID(initialSession, clientID);
this.clientID = clientID;
this.addSessionMetaData(initialSession); this.addSessionMetaData(initialSession);
} catch (ActiveMQException e) { } catch (ActiveMQException e) {
JMSException ex = new JMSException("Internal error setting metadata jms-client-id"); JMSException ex = new JMSException("Internal error setting metadata jms-client-id");
@ -257,12 +257,15 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
justCreated = false; justCreated = false;
} }
private void validateClientID(ClientSession validateSession, String clientID) throws InvalidClientIDException { private void validateClientID(ClientSession validateSession, String clientID)
throws InvalidClientIDException, ActiveMQException {
try { try {
validateSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID); validateSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
} catch (ActiveMQException e) { } catch (ActiveMQException e) {
if (e.getType() == ActiveMQExceptionType.DUPLICATE_METADATA) { if (e.getType() == ActiveMQExceptionType.DUPLICATE_METADATA) {
throw new InvalidClientIDException("clientID=" + clientID + " was already set into another connection"); throw new InvalidClientIDException("clientID=" + clientID + " was already set into another connection");
} else {
throw e;
} }
} }
} }

View File

@ -1523,6 +1523,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
ServerSession sessionWithMetaData = server.lookupSession(key, data); ServerSession sessionWithMetaData = server.lookupSession(key, data);
if (sessionWithMetaData != null && sessionWithMetaData != this) { if (sessionWithMetaData != null && sessionWithMetaData != this) {
// There is a duplication of this property // There is a duplication of this property
if (server.hasBrokerPlugins()) {
server.callBrokerPlugins(plugin -> plugin.duplicateSessionMetadataFailure(this, key, data));
}
return false; return false;
} else { } else {
addMetaData(key, data); addMetaData(key, data);

View File

@ -157,6 +157,18 @@ public interface ActiveMQServerPlugin {
} }
/**
* Called when adding session metadata fails because the metadata is a duplicate
*
* @param session
* @param key
* @param data
* @throws ActiveMQException
*/
default void duplicateSessionMetadataFailure(ServerSession session, String key, String data) throws ActiveMQException {
}
/** /**
* After session metadata is added to the session * After session metadata is added to the session
* *

View File

@ -16,7 +16,10 @@
*/ */
package org.apache.activemq.artemis.tests.integration.jms.client; package org.apache.activemq.artemis.tests.integration.jms.client;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -33,9 +36,12 @@ import org.junit.Test;
* is added * is added
*/ */
public class SessionMetadataAddExceptionTest extends JMSTestBase { public class SessionMetadataAddExceptionTest extends JMSTestBase {
private AtomicInteger duplicateCount = new AtomicInteger();
@Override @Override
protected Configuration createDefaultConfig(boolean netty) throws Exception { protected Configuration createDefaultConfig(boolean netty) throws Exception {
duplicateCount.set(0);
Configuration config = super.createDefaultConfig(netty); Configuration config = super.createDefaultConfig(netty);
config.registerBrokerPlugin(new ActiveMQServerPlugin() { config.registerBrokerPlugin(new ActiveMQServerPlugin() {
@ -50,6 +56,18 @@ public class SessionMetadataAddExceptionTest extends JMSTestBase {
} }
} }
@Override
public void duplicateSessionMetadataFailure(ServerSession session, String key, String data)
throws ActiveMQException {
//count number of times method called
duplicateCount.incrementAndGet();
if (data.equals("valid2")) {
throw new ActiveMQException("failure");
}
}
}); });
return config; return config;
@ -75,6 +93,36 @@ public class SessionMetadataAddExceptionTest extends JMSTestBase {
cf.createConnection(); cf.createConnection();
} }
@Test(timeout = 5000)
public void testDuplicateClientIdSet() throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = (ActiveMQConnectionFactory) cf;
Connection con = cf.createConnection();
Connection con2 = cf.createConnection();
try {
con.setClientID("valid");
con2.setClientID("valid");
fail("Should have failed for duplicate clientId");
} catch (InvalidClientIDException e) {
assertEquals(1, duplicateCount.get());
} finally {
activeMQConnectionFactory.close();
}
}
@Test(timeout = 5000, expected = JMSException.class)
public void testDuplicateClientIdSetActiveMQException() throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = (ActiveMQConnectionFactory) cf;
Connection con = cf.createConnection();
Connection con2 = cf.createConnection();
try {
con.setClientID("valid2");
con2.setClientID("valid2");
fail("Should have failed");
} finally {
activeMQConnectionFactory.close();
}
}
@Test(timeout = 5000) @Test(timeout = 5000)
public void testValidIdSetConnection() throws Exception { public void testValidIdSetConnection() throws Exception {
Connection con = cf.createConnection(); Connection con = cf.createConnection();