This commit is contained in:
Clebert Suconic 2018-08-13 17:48:29 -04:00
commit 8d0338ebc2
13 changed files with 97 additions and 4 deletions

View File

@ -95,6 +95,8 @@ public final class ActiveMQClient {
public static final boolean DEFAULT_PRE_ACKNOWLEDGE = false;
public static final boolean DEFAULT_ENABLED_SHARED_CLIENT_ID = false;
public static final long DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT = 10000;
public static final long DEFAULT_DISCOVERY_REFRESH_TIMEOUT = 10000;

View File

@ -679,11 +679,19 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
}
public void authorize() throws JMSException {
authorize(true);
}
public void authorize(boolean validateClientId) throws JMSException {
try {
initialSession = sessionFactory.createSession(username, password, false, false, false, false, 0);
if (clientID != null) {
validateClientID(initialSession, clientID);
if (validateClientId) {
validateClientID(initialSession, clientID);
} else {
initialSession.addMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
}
}
addSessionMetaData(initialSession);
@ -718,6 +726,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
return this.factoryReference.getDeserializationWhiteList();
}
// Inner classes --------------------------------------------------------------------------------
private static class JMSFailureListener implements SessionFailureListener {

View File

@ -70,6 +70,8 @@ public class ActiveMQConnectionFactory extends JNDIStorable implements Connectio
private String clientID;
private boolean enableSharedClientID = ActiveMQClient.DEFAULT_ENABLED_SHARED_CLIENT_ID;
private int dupsOKBatchSize = ActiveMQClient.DEFAULT_ACK_BATCH_SIZE;
private int transactionBatchSize = ActiveMQClient.DEFAULT_ACK_BATCH_SIZE;
@ -452,6 +454,14 @@ public class ActiveMQConnectionFactory extends JNDIStorable implements Connectio
this.clientID = clientID;
}
public boolean isEnableSharedClientID() {
return enableSharedClientID;
}
public void setEnableSharedClientID(boolean enableSharedClientID) {
this.enableSharedClientID = enableSharedClientID;
}
public synchronized int getDupsOKBatchSize() {
return dupsOKBatchSize;
}
@ -857,7 +867,7 @@ public class ActiveMQConnectionFactory extends JNDIStorable implements Connectio
connection.setReference(this);
try {
connection.authorize();
connection.authorize(!isEnableSharedClientID());
} catch (JMSException e) {
try {
connection.close();
@ -882,6 +892,8 @@ public class ActiveMQConnectionFactory extends JNDIStorable implements Connectio
transactionBatchSize +
", readOnly=" +
readOnly +
"EnableSharedClientID=" +
enableSharedClientID +
"]";
}

View File

@ -193,4 +193,8 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport {
boolean isEnable1xPrefixes();
ConnectionFactoryConfiguration setEnable1xPrefixes(boolean enable1xPrefixes);
boolean isEnableSharedClientID();
ConnectionFactoryConfiguration setEnableSharedClientID(boolean enabled);
}

View File

@ -126,6 +126,9 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
private boolean enable1xPrefixes = ActiveMQClient.DEFAULT_ENABLE_1X_PREFIXES;
private boolean enableSharedClientID = ActiveMQClient.DEFAULT_ENABLED_SHARED_CLIENT_ID;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@ -638,6 +641,9 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
deserializationWhiteList = BufferHelper.readNullableSimpleStringAsString(buffer);
enable1xPrefixes = buffer.readableBytes() > 0 ? buffer.readBoolean() : null;
enableSharedClientID = buffer.readableBytes() > 0 ? buffer.readBoolean() : ActiveMQClient.DEFAULT_ENABLED_SHARED_CLIENT_ID;
}
@Override
@ -729,6 +735,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
BufferHelper.writeAsNullableSimpleString(buffer, deserializationWhiteList);
buffer.writeBoolean(enable1xPrefixes);
BufferHelper.writeNullableBoolean(buffer, enableSharedClientID);
}
@Override
@ -844,9 +852,11 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
BufferHelper.sizeOfNullableSimpleString(deserializationWhiteList) +
DataConstants.SIZE_BOOLEAN;
DataConstants.SIZE_BOOLEAN +
// enable1xPrefixes;
BufferHelper.sizeOfNullableBoolean(enableSharedClientID);
return size;
}
@ -914,6 +924,16 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
return this;
}
@Override
public ConnectionFactoryConfiguration setEnableSharedClientID(boolean enabled) {
this.enableSharedClientID = enabled;
return this;
}
@Override
public boolean isEnableSharedClientID() {
return enableSharedClientID;
}
// Public --------------------------------------------------------

View File

@ -1221,6 +1221,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
cf.setDeserializationWhiteList(cfConfig.getDeserializationWhiteList());
cf.setInitialMessagePacketSize(cfConfig.getInitialMessagePacketSize());
cf.setEnable1xPrefixes(cfConfig.isEnable1xPrefixes());
cf.setEnableSharedClientID(cfConfig.isEnableSharedClientID());
return cf;
}

View File

@ -1792,6 +1792,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
}
cf.setEnableSharedClientID(true);
setParams(cf, overrideProperties);
return cf;
}
@ -1858,6 +1859,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
cf.setReconnectAttempts(0);
cf.setInitialConnectAttempts(0);
cf.setEnableSharedClientID(true);
return cf;
}

View File

@ -130,6 +130,8 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
private String deserializationWhiteList;
private Boolean enableSharedClientID;
/**
* @return the transportType
*/
@ -755,6 +757,14 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
return hasBeenUpdated;
}
public void setEnableSharedClientID(boolean enable) {
this.enableSharedClientID = enable;
}
public boolean isEnableSharedClientID() {
return enableSharedClientID;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
@ -999,6 +1009,12 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
} else if (!this.enable1xPrefixes.equals(other.enable1xPrefixes))
return false;
if (enableSharedClientID == null) {
if (other.enableSharedClientID != null)
return false;
} else if (!enableSharedClientID == other.enableSharedClientID)
return false;
return true;
}
@ -1052,6 +1068,7 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
result = prime * result + ((deserializationBlackList == null) ? 0 : deserializationBlackList.hashCode());
result = prime * result + ((deserializationWhiteList == null) ? 0 : deserializationWhiteList.hashCode());
result = prime * result + ((enable1xPrefixes == null) ? 0 : enable1xPrefixes.hashCode());
result = prime * result + ((enableSharedClientID == null) ? 0 : enableSharedClientID.hashCode());
return result;
}
}

View File

@ -454,12 +454,14 @@ public class ActiveMQActivation {
// This will clone the connection factory
// to make sure we won't close anyone's connection factory when we stop the MDB
factory = ActiveMQJMSClient.createConnectionFactory(((ActiveMQConnectionFactory) fac).toURI().toString(), "internalConnection");
factory.setEnableSharedClientID(true);
} else {
factory = ra.newConnectionFactory(spec);
}
} else {
factory = ra.newConnectionFactory(spec);
}
}
/**

View File

@ -217,6 +217,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
}
public ActiveMQConnectionFactory(URI brokerURL) {
setBrokerURL(brokerURL.toString());
}

View File

@ -35,6 +35,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
@ -45,7 +46,6 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.jms.serializables.TestClass1;
@ -444,6 +444,7 @@ public class ActiveMQConnectionFactoryTest extends ActiveMQTestBase {
long retryInterval = RandomUtil.randomPositiveLong();
double retryIntervalMultiplier = RandomUtil.randomDouble();
int reconnectAttempts = RandomUtil.randomPositiveInt();
boolean enableSharedClientID = true;
try {
cf.setClientID(clientID);

View File

@ -131,6 +131,27 @@ public class ConnectionTest extends JMSTestBase {
session2.close();
}
@Test
public void testTwoConnectionsSameIDThroughCFWithShareClientIDEnabeld() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?clientID=myid;enableSharedClientID=true");
conn = connectionFactory.createConnection();
try {
conn2 = connectionFactory.createConnection();
} catch (InvalidClientIDException expected) {
Assert.fail("Should allow sharing of client IDs among the same CF");
}
Session session1 = conn.createSession();
Session session2 = conn.createSession();
Session session3 = conn2.createSession();
Session session4 = conn2.createSession();
session1.close();
session2.close();
session3.close();
session4.close();
}
@Test
public void testGetSetConnectionFactory() throws Exception {

View File

@ -46,6 +46,7 @@ public class ConnectionFactoryPropertiesTest extends ActiveMQTestBase {
UNSUPPORTED_CF_PROPERTIES.add("user");
UNSUPPORTED_CF_PROPERTIES.add("userName");
UNSUPPORTED_CF_PROPERTIES.add("password");
UNSUPPORTED_CF_PROPERTIES.add("enableSharedClientID");
UNSUPPORTED_RA_PROPERTIES = new TreeSet<>();
UNSUPPORTED_RA_PROPERTIES.add("HA");