diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index dbc3fa0df4..e2cd6df0f6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -101,7 +101,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private static final Log log = LogFactory.getLog(ActiveMQConnection.class); private static final IdGenerator connectionIdGenerator = new IdGenerator(); - private static final IdGenerator clientIdGenerator = new IdGenerator(); public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER; public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; @@ -130,9 +129,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private boolean useRetroactiveConsumer; private int closeTimeout = 15000; - private final JMSConnectionStatsImpl stats; - private final JMSStatsImpl factoryStats; private final Transport transport; + private final IdGenerator clientIdGenerator; + private final JMSStatsImpl factoryStats; + private final JMSConnectionStatsImpl stats; + private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean closing = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false); @@ -167,9 +168,13 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @param password * @throws Exception */ - protected ActiveMQConnection(final Transport transport, JMSStatsImpl factoryStats) + protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception { + this.transport = transport; + this.clientIdGenerator = clientIdGenerator; + this.factoryStats = factoryStats; + // Configure a single threaded executor who's core thread can timeout if idle asyncConnectionThread = new ThreadPoolExecutor(1,1,5,TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { public Thread newThread(Runnable r) { @@ -183,11 +188,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon this.info.setManageable(true); this.connectionSessionId = new SessionId(info.getConnectionId(), -1); - this.transport = transport; this.transport.setTransportListener(this); this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection); - this.factoryStats = factoryStats; this.factoryStats.addConnection(this); } @@ -1230,6 +1233,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon advisoryConsumer = new AdvisoryConsumer(this, consumerId); } + /** * @return Returns the useAsyncSend. */ diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 45ab58d8ae..380109ac85 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -36,6 +36,7 @@ import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.URISupport; @@ -60,6 +61,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne public static final String DEFAULT_USER = null; public static final String DEFAULT_PASSWORD = null; + private IdGenerator clientIdGenerator; + private String clientIDPrefix; protected URI brokerURL; protected String userName; protected String password; @@ -251,7 +254,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne } protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { - ActiveMQConnection connection = new ActiveMQConnection(transport, stats); + ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), stats); return connection; } @@ -609,4 +612,34 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { this.nestedMapAndListEnabled = structuredMapsEnabled; } + + public String getClientIDPrefix() { + return clientIDPrefix; + } + + /** + * Sets the prefix used by autogenerated JMS Client ID values which are + * used if the JMS client does not explicitly specify on. + * + * @param clientIDPrefix + */ + public void setClientIDPrefix(String clientIDPrefix) { + this.clientIDPrefix = clientIDPrefix; + } + + protected synchronized IdGenerator getClientIdGenerator() { + if (clientIdGenerator == null) { + if (clientIDPrefix != null) { + clientIdGenerator = new IdGenerator(clientIDPrefix); + } + else { + clientIdGenerator = new IdGenerator(); + } + } + return clientIdGenerator; + } + + protected void setClientIdGenerator(IdGenerator clientIdGenerator) { + this.clientIdGenerator = clientIdGenerator; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java index 209bb87d96..51bf983bf4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java @@ -27,6 +27,7 @@ import javax.jms.XATopicSession; import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.transport.Transport; +import org.apache.activemq.util.IdGenerator; /** * The XAConnection interface extends the capability of Connection by providing @@ -49,15 +50,8 @@ import org.apache.activemq.transport.Transport; */ public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicConnection, XAQueueConnection, XAConnection { - /** - * @param transport - * @param theUserName - * @param thePassword - * @param factoryStats - * @throws Exception - */ - protected ActiveMQXAConnection(Transport transport, JMSStatsImpl factoryStats) throws Exception { - super(transport, factoryStats); + protected ActiveMQXAConnection(Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception { + super(transport, clientIdGenerator, factoryStats); } public XASession createXASession() throws JMSException { diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java index 579cbe1c35..da9f237d59 100644 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java @@ -80,7 +80,7 @@ public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory imple } protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { - ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, stats); + ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, getClientIdGenerator(), stats); return connection; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java b/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java index 82f4b7e6c3..4631d10edc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java @@ -49,7 +49,7 @@ public class IdGenerator{ try { hostName = InetAddress.getLocalHost().getHostName(); ServerSocket ss = new ServerSocket(0); - stub=hostName + "-" + ss.getLocalPort() + "-" + System.currentTimeMillis() + "-"; + stub="-" + ss.getLocalPort() + "-" + System.currentTimeMillis() + "-"; Thread.sleep(100); ss.close(); }catch(Exception ioe){ @@ -57,7 +57,7 @@ public class IdGenerator{ } }else{ hostName="localhost"; - stub = hostName + "-1-" +System.currentTimeMillis() +"-"; + stub = "-1-" +System.currentTimeMillis() +"-"; } UNIQUE_STUB = stub; } @@ -84,7 +84,7 @@ public class IdGenerator{ } public IdGenerator(){ - this("ID:"); + this("ID:" + hostName); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/util/package.html b/activemq-core/src/main/java/org/apache/activemq/util/package.html new file mode 100755 index 0000000000..0d3b7f4aa2 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/util/package.html @@ -0,0 +1,9 @@ + + + + + +Some utility classes + + + diff --git a/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java b/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java index b382b86015..fd8993628e 100755 --- a/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java @@ -29,6 +29,24 @@ import org.apache.activemq.broker.TransportConnector; public class ActiveMQConnectionFactoryTest extends CombinationTestSupport { + public void testUseURIToSetUseClientIDPrefixOnConnectionFactory() throws URISyntaxException, JMSException { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.clientIDPrefix=Cheese"); + assertEquals("Cheese", cf.getClientIDPrefix()); + + ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + try { + connection.start(); + + String clientID = connection.getClientID(); + log.info("Got client ID: " + clientID); + + assertTrue("should start with Cheese! but was: " + clientID, clientID.startsWith("Cheese")); + } + finally { + connection.close(); + } + } + public void testUseURIToSetOptionsOnConnectionFactory() throws URISyntaxException, JMSException { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.useAsyncSend=true"); assertTrue(cf.isUseAsyncSend());