mirror of https://github.com/apache/activemq.git
Added an optional clientIDPrefix property to allow auto-generated client Ids to be named to make it easier to manage a running system using JMX. Fixes AMQ-836
You can now specify this clientIDPrefix property on the ActiveMQConnectionFactory POJO or via a URI such as tcp://localhost:61616?jms.clientIDPrefix=Cheese git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@424697 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
68d7c9e2fd
commit
7339c6efea
|
@ -101,7 +101,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
|
private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
|
||||||
private static final IdGenerator connectionIdGenerator = new IdGenerator();
|
private static final IdGenerator connectionIdGenerator = new IdGenerator();
|
||||||
private static final IdGenerator clientIdGenerator = new IdGenerator();
|
|
||||||
|
|
||||||
public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
|
public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
|
||||||
public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
|
public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
|
||||||
|
@ -130,9 +129,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
private boolean useRetroactiveConsumer;
|
private boolean useRetroactiveConsumer;
|
||||||
private int closeTimeout = 15000;
|
private int closeTimeout = 15000;
|
||||||
|
|
||||||
private final JMSConnectionStatsImpl stats;
|
|
||||||
private final JMSStatsImpl factoryStats;
|
|
||||||
private final Transport transport;
|
private final Transport transport;
|
||||||
|
private final IdGenerator clientIdGenerator;
|
||||||
|
private final JMSStatsImpl factoryStats;
|
||||||
|
private final JMSConnectionStatsImpl stats;
|
||||||
|
|
||||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||||
private final AtomicBoolean closing = new AtomicBoolean(false);
|
private final AtomicBoolean closing = new AtomicBoolean(false);
|
||||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||||
|
@ -167,9 +168,13 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
* @param password
|
* @param password
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
protected ActiveMQConnection(final Transport transport, JMSStatsImpl factoryStats)
|
protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
||||||
|
this.transport = transport;
|
||||||
|
this.clientIdGenerator = clientIdGenerator;
|
||||||
|
this.factoryStats = factoryStats;
|
||||||
|
|
||||||
// Configure a single threaded executor who's core thread can timeout if idle
|
// Configure a single threaded executor who's core thread can timeout if idle
|
||||||
asyncConnectionThread = new ThreadPoolExecutor(1,1,5,TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
|
asyncConnectionThread = new ThreadPoolExecutor(1,1,5,TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
|
||||||
public Thread newThread(Runnable r) {
|
public Thread newThread(Runnable r) {
|
||||||
|
@ -183,11 +188,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
this.info.setManageable(true);
|
this.info.setManageable(true);
|
||||||
this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
|
this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
|
||||||
|
|
||||||
this.transport = transport;
|
|
||||||
this.transport.setTransportListener(this);
|
this.transport.setTransportListener(this);
|
||||||
|
|
||||||
this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
|
this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
|
||||||
this.factoryStats = factoryStats;
|
|
||||||
this.factoryStats.addConnection(this);
|
this.factoryStats.addConnection(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1230,6 +1233,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
advisoryConsumer = new AdvisoryConsumer(this, consumerId);
|
advisoryConsumer = new AdvisoryConsumer(this, consumerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Returns the useAsyncSend.
|
* @return Returns the useAsyncSend.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.activemq.management.StatsCapable;
|
||||||
import org.apache.activemq.management.StatsImpl;
|
import org.apache.activemq.management.StatsImpl;
|
||||||
import org.apache.activemq.transport.Transport;
|
import org.apache.activemq.transport.Transport;
|
||||||
import org.apache.activemq.transport.TransportFactory;
|
import org.apache.activemq.transport.TransportFactory;
|
||||||
|
import org.apache.activemq.util.IdGenerator;
|
||||||
import org.apache.activemq.util.IntrospectionSupport;
|
import org.apache.activemq.util.IntrospectionSupport;
|
||||||
import org.apache.activemq.util.JMSExceptionSupport;
|
import org.apache.activemq.util.JMSExceptionSupport;
|
||||||
import org.apache.activemq.util.URISupport;
|
import org.apache.activemq.util.URISupport;
|
||||||
|
@ -60,6 +61,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
||||||
public static final String DEFAULT_USER = null;
|
public static final String DEFAULT_USER = null;
|
||||||
public static final String DEFAULT_PASSWORD = null;
|
public static final String DEFAULT_PASSWORD = null;
|
||||||
|
|
||||||
|
private IdGenerator clientIdGenerator;
|
||||||
|
private String clientIDPrefix;
|
||||||
protected URI brokerURL;
|
protected URI brokerURL;
|
||||||
protected String userName;
|
protected String userName;
|
||||||
protected String password;
|
protected String password;
|
||||||
|
@ -251,7 +254,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
|
protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
|
||||||
ActiveMQConnection connection = new ActiveMQConnection(transport, stats);
|
ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), stats);
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -609,4 +612,34 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
||||||
public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
|
public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
|
||||||
this.nestedMapAndListEnabled = structuredMapsEnabled;
|
this.nestedMapAndListEnabled = structuredMapsEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getClientIDPrefix() {
|
||||||
|
return clientIDPrefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the prefix used by autogenerated JMS Client ID values which are
|
||||||
|
* used if the JMS client does not explicitly specify on.
|
||||||
|
*
|
||||||
|
* @param clientIDPrefix
|
||||||
|
*/
|
||||||
|
public void setClientIDPrefix(String clientIDPrefix) {
|
||||||
|
this.clientIDPrefix = clientIDPrefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected synchronized IdGenerator getClientIdGenerator() {
|
||||||
|
if (clientIdGenerator == null) {
|
||||||
|
if (clientIDPrefix != null) {
|
||||||
|
clientIdGenerator = new IdGenerator(clientIDPrefix);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
clientIdGenerator = new IdGenerator();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return clientIdGenerator;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
|
||||||
|
this.clientIdGenerator = clientIdGenerator;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import javax.jms.XATopicSession;
|
||||||
|
|
||||||
import org.apache.activemq.management.JMSStatsImpl;
|
import org.apache.activemq.management.JMSStatsImpl;
|
||||||
import org.apache.activemq.transport.Transport;
|
import org.apache.activemq.transport.Transport;
|
||||||
|
import org.apache.activemq.util.IdGenerator;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The XAConnection interface extends the capability of Connection by providing
|
* The XAConnection interface extends the capability of Connection by providing
|
||||||
|
@ -49,15 +50,8 @@ import org.apache.activemq.transport.Transport;
|
||||||
*/
|
*/
|
||||||
public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicConnection, XAQueueConnection, XAConnection {
|
public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicConnection, XAQueueConnection, XAConnection {
|
||||||
|
|
||||||
/**
|
protected ActiveMQXAConnection(Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
|
||||||
* @param transport
|
super(transport, clientIdGenerator, factoryStats);
|
||||||
* @param theUserName
|
|
||||||
* @param thePassword
|
|
||||||
* @param factoryStats
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
protected ActiveMQXAConnection(Transport transport, JMSStatsImpl factoryStats) throws Exception {
|
|
||||||
super(transport, factoryStats);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public XASession createXASession() throws JMSException {
|
public XASession createXASession() throws JMSException {
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory imple
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
|
protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
|
||||||
ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, stats);
|
ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, getClientIdGenerator(), stats);
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,7 @@ public class IdGenerator{
|
||||||
try {
|
try {
|
||||||
hostName = InetAddress.getLocalHost().getHostName();
|
hostName = InetAddress.getLocalHost().getHostName();
|
||||||
ServerSocket ss = new ServerSocket(0);
|
ServerSocket ss = new ServerSocket(0);
|
||||||
stub=hostName + "-" + ss.getLocalPort() + "-" + System.currentTimeMillis() + "-";
|
stub="-" + ss.getLocalPort() + "-" + System.currentTimeMillis() + "-";
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
ss.close();
|
ss.close();
|
||||||
}catch(Exception ioe){
|
}catch(Exception ioe){
|
||||||
|
@ -57,7 +57,7 @@ public class IdGenerator{
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
hostName="localhost";
|
hostName="localhost";
|
||||||
stub = hostName + "-1-" +System.currentTimeMillis() +"-";
|
stub = "-1-" +System.currentTimeMillis() +"-";
|
||||||
}
|
}
|
||||||
UNIQUE_STUB = stub;
|
UNIQUE_STUB = stub;
|
||||||
}
|
}
|
||||||
|
@ -84,7 +84,7 @@ public class IdGenerator{
|
||||||
}
|
}
|
||||||
|
|
||||||
public IdGenerator(){
|
public IdGenerator(){
|
||||||
this("ID:");
|
this("ID:" + hostName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
|
||||||
|
Some utility classes
|
||||||
|
|
||||||
|
</body>
|
||||||
|
</html>
|
|
@ -29,6 +29,24 @@ import org.apache.activemq.broker.TransportConnector;
|
||||||
|
|
||||||
public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
|
public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
|
||||||
|
|
||||||
|
public void testUseURIToSetUseClientIDPrefixOnConnectionFactory() throws URISyntaxException, JMSException {
|
||||||
|
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.clientIDPrefix=Cheese");
|
||||||
|
assertEquals("Cheese", cf.getClientIDPrefix());
|
||||||
|
|
||||||
|
ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
|
||||||
|
try {
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
String clientID = connection.getClientID();
|
||||||
|
log.info("Got client ID: " + clientID);
|
||||||
|
|
||||||
|
assertTrue("should start with Cheese! but was: " + clientID, clientID.startsWith("Cheese"));
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testUseURIToSetOptionsOnConnectionFactory() throws URISyntaxException, JMSException {
|
public void testUseURIToSetOptionsOnConnectionFactory() throws URISyntaxException, JMSException {
|
||||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.useAsyncSend=true");
|
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.useAsyncSend=true");
|
||||||
assertTrue(cf.isUseAsyncSend());
|
assertTrue(cf.isUseAsyncSend());
|
||||||
|
|
Loading…
Reference in New Issue