Timothy Bish 2013-11-11 12:05:52 -05:00
parent 6552287221
commit b0b3a169ce
1 changed files with 100 additions and 122 deletions

View File

@ -45,13 +45,11 @@ import org.slf4j.LoggerFactory;
/** /**
* A {@link BrokerFacade} which uses a JMX-Connection to communicate with a * A {@link BrokerFacade} which uses a JMX-Connection to communicate with a
* broker * broker
*
*
*/ */
public class RemoteJMXBrokerFacade extends BrokerFacadeSupport { public class RemoteJMXBrokerFacade extends BrokerFacadeSupport {
private static final transient Logger LOG = LoggerFactory.getLogger(RemoteJMXBrokerFacade.class); private static final transient Logger LOG = LoggerFactory.getLogger(RemoteJMXBrokerFacade.class);
private String brokerName; private String brokerName;
private JMXConnector connector; private JMXConnector connector;
private WebConsoleConfiguration configuration; private WebConsoleConfiguration configuration;
@ -61,47 +59,38 @@ public class RemoteJMXBrokerFacade extends BrokerFacadeSupport {
} }
public WebConsoleConfiguration getConfiguration() { public WebConsoleConfiguration getConfiguration() {
return configuration; return configuration;
} }
public void setConfiguration(WebConsoleConfiguration configuration) { public void setConfiguration(WebConsoleConfiguration configuration) {
this.configuration = configuration; this.configuration = configuration;
} }
/** /**
* Shutdown this facade aka close any open connection. * Shutdown this facade aka close any open connection.
*/ */
public void shutdown() { public void shutdown() {
closeConnection(); closeConnection();
} }
private ObjectName getBrokerObjectName(MBeanServerConnection connection) @Override
throws IOException, MalformedObjectNameException {
Set<ObjectName> brokers = findBrokers(connection);
if (brokers.size() == 0) {
throw new IOException("No broker could be found in the JMX.");
}
ObjectName name = brokers.iterator().next();
return name;
}
public BrokerViewMBean getBrokerAdmin() throws Exception { public BrokerViewMBean getBrokerAdmin() throws Exception {
MBeanServerConnection connection = getMBeanServerConnection(); MBeanServerConnection connection = getMBeanServerConnection();
Set brokers = findBrokers(connection); Set<ObjectName> brokers = findBrokers(connection);
if (brokers.size() == 0) { if (brokers.size() == 0) {
throw new IOException("No broker could be found in the JMX."); throw new IOException("No broker could be found in the JMX.");
} }
ObjectName name = (ObjectName)brokers.iterator().next(); ObjectName name = brokers.iterator().next();
BrokerViewMBean mbean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean.class, true); BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean.class, true);
return mbean; return mbean;
} }
public String getBrokerName() throws Exception, @Override
MalformedObjectNameException { public String getBrokerName() throws Exception, MalformedObjectNameException {
return getBrokerAdmin().getBrokerName(); return getBrokerAdmin().getBrokerName();
} }
protected MBeanServerConnection getMBeanServerConnection() throws Exception { protected MBeanServerConnection getMBeanServerConnection() throws Exception {
JMXConnector connector = this.connector; JMXConnector connector = this.connector;
if (isConnectionActive(connector)) { if (isConnectionActive(connector)) {
@ -110,7 +99,6 @@ public class RemoteJMXBrokerFacade extends BrokerFacadeSupport {
synchronized (this) { synchronized (this) {
closeConnection(); closeConnection();
LOG.debug("Creating a new JMX-Connection to the broker"); LOG.debug("Creating a new JMX-Connection to the broker");
this.connector = createConnection(); this.connector = createConnection();
return this.connector.getMBeanServerConnection(); return this.connector.getMBeanServerConnection();
@ -134,117 +122,108 @@ public class RemoteJMXBrokerFacade extends BrokerFacadeSupport {
protected JMXConnector createConnection() { protected JMXConnector createConnection() {
Map<String, Object> env = new HashMap<String, Object>(); Map<String, Object> env = new HashMap<String, Object>();
if (this.configuration.getJmxUser() != null) { if (this.configuration.getJmxUser() != null) {
env.put("jmx.remote.credentials", new String[] { env.put("jmx.remote.credentials", new String[] { this.configuration.getJmxUser(), this.configuration.getJmxPassword() });
this.configuration.getJmxUser(), }
this.configuration.getJmxPassword() });
}
Collection<JMXServiceURL> jmxUrls = this.configuration.getJmxUrls(); Collection<JMXServiceURL> jmxUrls = this.configuration.getJmxUrls();
Exception exception = null; Exception exception = null;
for (JMXServiceURL url : jmxUrls) { for (JMXServiceURL url : jmxUrls) {
try { try {
JMXConnector connector = JMXConnectorFactory.connect(url, env); JMXConnector connector = JMXConnectorFactory.connect(url, env);
connector.connect(); connector.connect();
MBeanServerConnection connection = connector MBeanServerConnection connection = connector.getMBeanServerConnection();
.getMBeanServerConnection();
Set<ObjectName> brokers = findBrokers(connection); Set<ObjectName> brokers = findBrokers(connection);
if (brokers.size() > 0) { if (brokers.size() > 0) {
LOG.info("Connected via JMX to the broker at " + url); LOG.info("Connected via JMX to the broker at " + url);
return connector; return connector;
} }
} catch (Exception e) { } catch (Exception e) {
// Keep the exception for later // Keep the exception for later
exception = e; exception = e;
} }
} }
if (exception != null) { if (exception != null) {
if (exception instanceof RuntimeException) { if (exception instanceof RuntimeException) {
throw (RuntimeException) exception; throw (RuntimeException) exception;
} else { } else {
throw new RuntimeException(exception); throw new RuntimeException(exception);
} }
} }
throw new IllegalStateException("No broker is found at any of the " throw new IllegalStateException("No broker is found at any of the " + jmxUrls.size() + " configured urls");
+ jmxUrls.size() + " configured urls"); }
}
protected synchronized void closeConnection() { protected synchronized void closeConnection() {
if (connector != null) { if (connector != null) {
try { try {
LOG.debug("Closing a connection to a broker (" + connector.getConnectionId() + ")"); LOG.debug("Closing a connection to a broker (" + connector.getConnectionId() + ")");
connector.close(); connector.close();
} catch (IOException e) { } catch (IOException e) {
// Ignore the exception, since it most likly won't matter // Ignore the exception, since it most likly won't matter anymore
// anymore
} }
} }
} }
/** /**
* Finds all ActiveMQ-Brokers registered on a certain JMX-Server or, if a * Finds all ActiveMQ-Brokers registered on a certain JMX-Server or, if a
* JMX-BrokerName has been set, the broker with that name. * JMX-BrokerName has been set, the broker with that name.
* *
* @param connection * @param connection
* not <code>null</code> * not <code>null</code>
* @return Set with ObjectName-elements * @return Set with ObjectName-elements
* @throws IOException * @throws IOException
* @throws MalformedObjectNameException * @throws MalformedObjectNameException
*/ */
@SuppressWarnings("unchecked") protected Set<ObjectName> findBrokers(MBeanServerConnection connection) throws IOException, MalformedObjectNameException {
protected Set<ObjectName> findBrokers(MBeanServerConnection connection) ObjectName name;
throws IOException, MalformedObjectNameException { if (this.brokerName == null) {
ObjectName name; name = new ObjectName("org.apache.activemq:type=Broker,brokerName=*");
if (this.brokerName == null) { } else {
name = new ObjectName("org.apache.activemq:type=Broker,brokerName=*"); name = new ObjectName("org.apache.activemq:type=Broker,brokerName=" + this.brokerName);
} else { }
name = new ObjectName("org.apache.activemq:brokerName="
+ this.brokerName + ",Type=broker");
}
Set<ObjectName> brokers = connection.queryNames(name, null); Set<ObjectName> brokers = connection.queryNames(name, null);
Set<ObjectName> masterBrokers = new HashSet<ObjectName>(); Set<ObjectName> masterBrokers = new HashSet<ObjectName>();
for (ObjectName objectName : brokers) { for (ObjectName objectName : brokers) {
BrokerViewMBean mbean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, objectName, BrokerViewMBean.class, true); BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(connection, objectName, BrokerViewMBean.class, true);
if (!mbean.isSlave()) masterBrokers.add(objectName); if (!mbean.isSlave())
} masterBrokers.add(objectName);
return masterBrokers; }
} return masterBrokers;
}
public void purgeQueue(ActiveMQDestination destination) throws Exception {
QueueViewMBean queue = getQueue(destination.getPhysicalName());
queue.purge();
}
public ManagementContext getManagementContext() {
throw new IllegalStateException("not supported");
}
@Override
@SuppressWarnings("unchecked") public void purgeQueue(ActiveMQDestination destination) throws Exception {
protected <T> Collection<T> getManagedObjects(ObjectName[] names, QueueViewMBean queue = getQueue(destination.getPhysicalName());
Class<T> type) { queue.purge();
MBeanServerConnection connection; }
try {
connection = getMBeanServerConnection();
} catch (Exception e) {
throw new RuntimeException(e);
}
List<T> answer = new ArrayList<T>(); @Override
if (connection != null) { public ManagementContext getManagementContext() {
for (int i = 0; i < names.length; i++) { throw new IllegalStateException("not supported");
ObjectName name = names[i]; }
T value = (T) MBeanServerInvocationHandler.newProxyInstance(
connection, name, type, true); @Override
if (value != null) { protected <T> Collection<T> getManagedObjects(ObjectName[] names, Class<T> type) {
answer.add(value); MBeanServerConnection connection;
} try {
} connection = getMBeanServerConnection();
} } catch (Exception e) {
return answer; throw new RuntimeException(e);
}
List<T> answer = new ArrayList<T>();
if (connection != null) {
for (int i = 0; i < names.length; i++) {
ObjectName name = names[i];
T value = MBeanServerInvocationHandler.newProxyInstance(connection, name, type, true);
if (value != null) {
answer.add(value);
}
}
}
return answer;
} }
@Override @Override
@ -253,8 +232,7 @@ public class RemoteJMXBrokerFacade extends BrokerFacadeSupport {
} }
@Override @Override
public Object newProxyInstance(ObjectName objectName, Class interfaceClass,boolean notificationBroadcaster) throws Exception { public Object newProxyInstance(ObjectName objectName, Class interfaceClass, boolean notificationBroadcaster) throws Exception {
return MBeanServerInvocationHandler.newProxyInstance(getMBeanServerConnection(), objectName, interfaceClass, notificationBroadcaster); return MBeanServerInvocationHandler.newProxyInstance(getMBeanServerConnection(), objectName, interfaceClass, notificationBroadcaster);
} }
} }