From 584c133e9da08d195a41b9177c2ca96c6dfbd894 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Fri, 8 Jun 2012 22:56:35 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-3877 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1348270 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/BrokerService.java | 22 ++ .../activemq/broker/jmx/AnnotatedMBean.java | 245 +++++++++--------- .../broker/jmx/AsyncAnnotatedMBean.java | 96 +++++++ .../broker/jmx/ManagedRegionBroker.java | 18 +- .../broker/jmx/MBeanOperationTimeoutTest.java | 135 ++++++++++ 5 files changed, 392 insertions(+), 124 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 5b9c323ba5..3e3d2a2d5e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -132,6 +132,7 @@ public class BrokerService implements Service { private boolean populateJMSXUserID; private boolean useAuthenticatedPrincipalForJMSXUserID; private boolean populateUserNameInMBeans; + private long mbeanInvocationTimeout = 0; private boolean useShutdownHook = true; private boolean useLoggingForShutdownErrors; @@ -2648,6 +2649,27 @@ public class BrokerService implements Service { this.populateUserNameInMBeans = value; } + /** + * Gets the time in Milliseconds that an invocation of an MBean method will wait before + * failing. The default value is to wait forever (zero). + * + * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). + */ + public long getMbeanInvocationTimeout() { + return mbeanInvocationTimeout; + } + + /** + * Gets the time in Milliseconds that an invocation of an MBean method will wait before + * failing. The default value is to wait forever (zero). + * + * @param mbeanInvocationTimeout + * timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). + */ + public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) { + this.mbeanInvocationTimeout = mbeanInvocationTimeout; + } + public boolean isNetworkConnectorStartAsync() { return networkConnectorStartAsync; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AnnotatedMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AnnotatedMBean.java index 454c5bf7e9..8207627645 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AnnotatedMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AnnotatedMBean.java @@ -16,155 +16,164 @@ */ package org.apache.activemq.broker.jmx; -import org.apache.activemq.broker.util.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.security.AccessController; import java.security.Principal; -import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import javax.management.*; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanException; +import javax.management.MBeanOperationInfo; +import javax.management.MBeanParameterInfo; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.ReflectionException; +import javax.management.StandardMBean; import javax.security.auth.Subject; +import org.apache.activemq.broker.util.AuditLogEntry; +import org.apache.activemq.broker.util.AuditLogService; +import org.apache.activemq.broker.util.JMXAuditLogEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * MBean that looks for method/parameter descriptions in the Info annotation. */ public class AnnotatedMBean extends StandardMBean { - private static final Map> primitives = new HashMap>(); + private static final Map> primitives = new HashMap>(); - private static final Logger LOG = LoggerFactory.getLogger("org.apache.activemq.audit"); + private static final Logger LOG = LoggerFactory.getLogger("org.apache.activemq.audit"); - private static boolean audit; - private static AuditLogService auditLog; + private static boolean audit; + private static AuditLogService auditLog; - static { - Class[] p = { byte.class, short.class, int.class, long.class, float.class, double.class, char.class, boolean.class, }; - for (Class c : p) { - primitives.put(c.getName(), c); - } - audit = "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.audit")); - if (audit) { - auditLog = AuditLogService.getAuditLog(); - } - } - - @SuppressWarnings("unchecked") - public static void registerMBean(ManagementContext context, Object object, ObjectName objectName) - throws Exception { - - String mbeanName = object.getClass().getName() + "MBean"; - - for (Class c : object.getClass().getInterfaces()) { - if (mbeanName.equals(c.getName())) { - context.registerMBean(new AnnotatedMBean(object, c), objectName); - return; - } + static { + Class[] p = { byte.class, short.class, int.class, long.class, float.class, double.class, char.class, boolean.class, }; + for (Class c : p) { + primitives.put(c.getName(), c); + } + audit = "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.audit")); + if (audit) { + auditLog = AuditLogService.getAuditLog(); + } } - context.registerMBean(object, objectName); - } - - /** Instance where the MBean interface is implemented by another object. */ - public AnnotatedMBean(T impl, Class mbeanInterface) throws NotCompliantMBeanException { - super(impl, mbeanInterface); - } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static void registerMBean(ManagementContext context, Object object, ObjectName objectName) throws Exception { - /** Instance where the MBean interface is implemented by this object. */ - protected AnnotatedMBean(Class mbeanInterface) throws NotCompliantMBeanException { - super(mbeanInterface); - } + String mbeanName = object.getClass().getName() + "MBean"; - /** {@inheritDoc} */ - @Override - protected String getDescription(MBeanAttributeInfo info) { + for (Class c : object.getClass().getInterfaces()) { + if (mbeanName.equals(c.getName())) { + context.registerMBean(new AnnotatedMBean(object, c), objectName); + return; + } + } - String descr = info.getDescription(); - Method m = getMethod(getMBeanInterface(), "get"+info.getName().substring(0, 1).toUpperCase()+info.getName().substring(1)); - if (m == null) - m = getMethod(getMBeanInterface(), "is"+info.getName().substring(0, 1).toUpperCase()+info.getName().substring(1)); - if (m == null) - m = getMethod(getMBeanInterface(), "does"+info.getName().substring(0, 1).toUpperCase()+info.getName().substring(1)); - - if (m != null) { - MBeanInfo d = m.getAnnotation(MBeanInfo.class); - if (d != null) - descr = d.value(); + context.registerMBean(object, objectName); } - return descr; - } - - /** {@inheritDoc} */ - @Override - protected String getDescription(MBeanOperationInfo op) { - String descr = op.getDescription(); - Method m = getMethod(op); - if (m != null) { - MBeanInfo d = m.getAnnotation(MBeanInfo.class); - if (d != null) - descr = d.value(); + /** Instance where the MBean interface is implemented by another object. */ + public AnnotatedMBean(T impl, Class mbeanInterface) throws NotCompliantMBeanException { + super(impl, mbeanInterface); } - return descr; - } - /** {@inheritDoc} */ - @Override - protected String getParameterName(MBeanOperationInfo op, MBeanParameterInfo param, int paramNo) { - String name = param.getName(); - Method m = getMethod(op); - if (m != null) { - for (Annotation a : m.getParameterAnnotations()[paramNo]) { - if (MBeanInfo.class.isInstance(a)) - name = MBeanInfo.class.cast(a).value(); - } + /** Instance where the MBean interface is implemented by this object. */ + protected AnnotatedMBean(Class mbeanInterface) throws NotCompliantMBeanException { + super(mbeanInterface); } - return name; - } - /** - * Extracts the Method from the MBeanOperationInfo - * @param op - * @return - */ - private Method getMethod(MBeanOperationInfo op) { - final MBeanParameterInfo[] params = op.getSignature(); - final String[] paramTypes = new String[params.length]; - for (int i = 0; i < params.length; i++) - paramTypes[i] = params[i].getType(); + /** {@inheritDoc} */ + @Override + protected String getDescription(MBeanAttributeInfo info) { - return getMethod(getMBeanInterface(), op.getName(), paramTypes); - } + String descr = info.getDescription(); + Method m = getMethod(getMBeanInterface(), "get" + info.getName().substring(0, 1).toUpperCase() + info.getName().substring(1)); + if (m == null) + m = getMethod(getMBeanInterface(), "is" + info.getName().substring(0, 1).toUpperCase() + info.getName().substring(1)); + if (m == null) + m = getMethod(getMBeanInterface(), "does" + info.getName().substring(0, 1).toUpperCase() + info.getName().substring(1)); - /** - * Returns the Method with the specified name and parameter types for the given class, - * null if it doesn't exist. - * @param mbean - * @param method - * @param params - * @return - */ - private static Method getMethod(Class mbean, String method, String... params) { - try { - final ClassLoader loader = mbean.getClassLoader(); - final Class[] paramClasses = new Class[params.length]; - for (int i = 0; i < params.length; i++) { - paramClasses[i] = primitives.get(params[i]); - if (paramClasses[i] == null) - paramClasses[i] = Class.forName(params[i], false, loader); - } - return mbean.getMethod(method, paramClasses); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - return null; + if (m != null) { + MBeanInfo d = m.getAnnotation(MBeanInfo.class); + if (d != null) + descr = d.value(); + } + return descr; + } + + /** {@inheritDoc} */ + @Override + protected String getDescription(MBeanOperationInfo op) { + + String descr = op.getDescription(); + Method m = getMethod(op); + if (m != null) { + MBeanInfo d = m.getAnnotation(MBeanInfo.class); + if (d != null) + descr = d.value(); + } + return descr; + } + + /** {@inheritDoc} */ + @Override + protected String getParameterName(MBeanOperationInfo op, MBeanParameterInfo param, int paramNo) { + String name = param.getName(); + Method m = getMethod(op); + if (m != null) { + for (Annotation a : m.getParameterAnnotations()[paramNo]) { + if (MBeanInfo.class.isInstance(a)) + name = MBeanInfo.class.cast(a).value(); + } + } + return name; + } + + /** + * Extracts the Method from the MBeanOperationInfo + * + * @param op + * @return + */ + private Method getMethod(MBeanOperationInfo op) { + final MBeanParameterInfo[] params = op.getSignature(); + final String[] paramTypes = new String[params.length]; + for (int i = 0; i < params.length; i++) + paramTypes[i] = params[i].getType(); + + return getMethod(getMBeanInterface(), op.getName(), paramTypes); + } + + /** + * Returns the Method with the specified name and parameter types for the + * given class, null if it doesn't exist. + * + * @param mbean + * @param method + * @param params + * @return + */ + private static Method getMethod(Class mbean, String method, String... params) { + try { + final ClassLoader loader = mbean.getClassLoader(); + final Class[] paramClasses = new Class[params.length]; + for (int i = 0; i < params.length; i++) { + paramClasses[i] = primitives.get(params[i]); + if (paramClasses[i] == null) + paramClasses[i] = Class.forName(params[i], false, loader); + } + return mbean.getMethod(method, paramClasses); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + return null; + } } - } @Override public Object invoke(String s, Object[] objects, String[] strings) throws MBeanException, ReflectionException { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java new file mode 100644 index 0000000000..57bc15c196 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java @@ -0,0 +1,96 @@ +package org.apache.activemq.broker.jmx; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.management.MBeanException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.ReflectionException; + +/** + * MBean that invokes the requested operation using an async operation and waits for the result + * if the operation times out then an exception is thrown. + */ +public class AsyncAnnotatedMBean extends AnnotatedMBean { + + private ExecutorService executor; + private long timeout = 0; + + public AsyncAnnotatedMBean(ExecutorService executor, long timeout, T impl, Class mbeanInterface) throws NotCompliantMBeanException { + super(impl, mbeanInterface); + + this.executor = executor; + this.timeout = timeout; + } + + protected AsyncAnnotatedMBean(Class mbeanInterface) throws NotCompliantMBeanException { + super(mbeanInterface); + } + + protected Object asyncInvole(String s, Object[] objects, String[] strings) throws MBeanException, ReflectionException { + return super.invoke(s, objects, strings); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static void registerMBean(ExecutorService executor, long timeout, ManagementContext context, Object object, ObjectName objectName) throws Exception { + + if (timeout < 0 && executor != null) { + throw new IllegalArgumentException("async timeout cannot be negative."); + } + + if (timeout > 0 && executor == null) { + throw new NullPointerException("timeout given but no ExecutorService instance given."); + } + + String mbeanName = object.getClass().getName() + "MBean"; + + for (Class c : object.getClass().getInterfaces()) { + if (mbeanName.equals(c.getName())) { + if (timeout == 0) { + context.registerMBean(new AnnotatedMBean(object, c), objectName); + } else { + context.registerMBean(new AsyncAnnotatedMBean(executor, timeout, object, c), objectName); + } + return; + } + } + + context.registerMBean(object, objectName); + } + + @Override + public Object invoke(String s, Object[] objects, String[] strings) throws MBeanException, ReflectionException { + + final String action = s; + final Object[] params = objects; + final String[] signature = strings; + + Future task = executor.submit(new Callable() { + + @Override + public Object call() throws Exception { + return asyncInvole(action, params, signature); + } + }); + + try { + return task.get(timeout, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + if (e.getCause() instanceof MBeanException) { + throw (MBeanException) e.getCause(); + } + + throw new MBeanException(e); + } catch (Exception e) { + throw new MBeanException(e); + } finally { + if (!task.isDone()) { + task.cancel(true); + } + } + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index 858fd34522..555e364f06 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -27,6 +27,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import javax.management.InstanceNotFoundException; @@ -103,11 +104,16 @@ public class ManagedRegionBroker extends RegionBroker { /* This is the first broker in the broker interceptor chain. */ private Broker contextBroker; + private final ExecutorService asyncInvokeService; + private final long mbeanTimeout; + public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException { super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor); this.managementContext = context; this.brokerObjectName = brokerObjectName; + this.mbeanTimeout = brokerService.getMbeanInvocationTimeout(); + this.asyncInvokeService = mbeanTimeout > 0 ? executor : null;; } @Override @@ -336,7 +342,7 @@ public class ManagedRegionBroker extends RegionBroker { } } try { - AnnotatedMBean.registerMBean(managementContext, view, key); + AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key); registeredMBeans.add(key); } catch (Throwable e) { LOG.warn("Failed to register MBean: " + key); @@ -392,7 +398,7 @@ public class ManagedRegionBroker extends RegionBroker { } try { - AnnotatedMBean.registerMBean(managementContext, view, key); + AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key); registeredMBeans.add(key); } catch (Throwable e) { LOG.warn("Failed to register MBean: " + key); @@ -456,7 +462,7 @@ public class ManagedRegionBroker extends RegionBroker { } try { - AnnotatedMBean.registerMBean(managementContext, view, key); + AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key); registeredMBeans.add(key); } catch (Throwable e) { LOG.warn("Failed to register MBean: " + key); @@ -535,7 +541,7 @@ public class ManagedRegionBroker extends RegionBroker { SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription); try { - AnnotatedMBean.registerMBean(managementContext, view, objectName); + AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName); registeredMBeans.add(objectName); } catch (Throwable e) { LOG.warn("Failed to register MBean: " + key); @@ -733,7 +739,7 @@ public class ManagedRegionBroker extends RegionBroker { objectName = createObjectName(strategy); if (!registeredMBeans.contains(objectName)) { AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy); - AnnotatedMBean.registerMBean(managementContext, view, objectName); + AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName); registeredMBeans.add(objectName); } } catch (Exception e) { @@ -757,7 +763,7 @@ public class ManagedRegionBroker extends RegionBroker { ObjectName objectName = createObjectName(transaction); if (!registeredMBeans.contains(objectName)) { RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction); - AnnotatedMBean.registerMBean(managementContext, view, objectName); + AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName); registeredMBeans.add(objectName); } } catch (Exception e) { diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java new file mode 100644 index 0000000000..2307b8271c --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MBeanOperationTimeoutTest { + private static final Logger LOG = LoggerFactory.getLogger(MBeanOperationTimeoutTest.class); + + private ActiveMQConnectionFactory connectionFactory; + private BrokerService broker; + private String connectionUri; + private static final String destinationName = "MBeanOperationTimeoutTestQ"; + private static final String moveToDestinationName = "MBeanOperationTimeoutTestQ.Moved"; + + protected MBeanServer mbeanServer; + protected String domain = "org.apache.activemq"; + + protected int messageCount = 50000; + + @Test + public void testLongOperationTimesOut() throws Exception { + + sendMessages(messageCount); + LOG.info("Produced " + messageCount + " messages to the broker."); + + // Now get the QueueViewMBean and purge + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + destinationName + ",BrokerName=localhost"); + QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + long count = proxy.getQueueSize(); + assertEquals("Queue size", count, messageCount); + + try { + LOG.info("Attempting to move one message."); + proxy.moveMatchingMessagesTo(null, moveToDestinationName); + fail("Queue purge should have timed out."); + } catch (TimeoutException e) { + LOG.info("Queue message move Timed out as expected."); + } + } + + private void sendMessages(int count) throws Exception { + Connection connection = connectionFactory.createConnection(); + try { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(destinationName); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < messageCount; i++) { + Message message = session.createMessage(); + message.setIntProperty("id", i); + producer.send(message); + } + session.commit(); + } finally { + connection.close(); + } + } + + protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException { + ObjectName objectName = new ObjectName(name); + if (mbeanServer.isRegistered(objectName)) { + LOG.info("Bean Registered: " + objectName); + } else { + fail("Could not find MBean!: " + objectName); + } + return objectName; + } + + @Before + public void setUp() throws Exception { + + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + connectionFactory = new ActiveMQConnectionFactory(connectionUri); + mbeanServer = broker.getManagementContext().getMBeanServer(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setMbeanInvocationTimeout(TimeUnit.SECONDS.toMillis(1)); + answer.setUseJmx(true); + answer.addConnector("vm://localhost"); + answer.setDeleteAllMessagesOnStartup(true); + return answer; + } +}