This commit is contained in:
Dejan Bosanac 2016-09-19 16:22:36 +02:00
parent a35d23dff7
commit 6630e81379
8 changed files with 328 additions and 9 deletions

View File

@ -83,6 +83,16 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava-version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.5</version>
</dependency>
</dependencies>
<reporting>

View File

@ -19,23 +19,20 @@ package org.apache.activemq.broker.jmx;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.*;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.BrokerSupport;
import org.slf4j.Logger;
@ -288,6 +285,24 @@ public class BrokerView implements BrokerViewMBean {
return safeGetBroker().getQueuesNonSuppressed();
}
@Override
public String queryQueues(String filter, int page, int pageSize) throws IOException {
return DestinationsViewFilter.create(filter)
.setDestinations(safeGetBroker().getQueueViews())
.filter(page, pageSize);
}
@Override
public String queryTopics(String filter, int page, int pageSize) throws IOException {
return DestinationsViewFilter.create(filter)
.setDestinations(safeGetBroker().getTopicViews())
.filter(page, pageSize);
}
public CompositeData[] browseQueue(String queueName) throws OpenDataException, MalformedObjectNameException {
return safeGetBroker().getQueueView(queueName).browse();
}
@Override
public ObjectName[] getTemporaryTopics() {
return safeGetBroker().getTemporaryTopicsNonSuppressed();

View File

@ -16,9 +16,13 @@
*/
package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.util.Map;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import org.apache.activemq.Service;
@ -178,6 +182,20 @@ public interface BrokerViewMBean extends Service {
@MBeanInfo("Standard Queues containing AIE messages.")
ObjectName[] getQueues();
/**
* Queue Query API, take a look at {@link DestinationsViewFilter} for more information
*/
@MBeanInfo("Query queues")
String queryQueues(String filter, int page, int pageSize) throws IOException;
/**
* Topic Query API, take a look at {@link DestinationsViewFilter} for more information
*/
@MBeanInfo("Query topics")
String queryTopics(String filter, int page, int pageSize) throws IOException;
public CompositeData[] browseQueue(String queueName) throws OpenDataException, MalformedObjectNameException;
@MBeanInfo("Temporary Topics; generally unused.")
ObjectName[] getTemporaryTopics();

View File

@ -0,0 +1,248 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringWriter;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import static org.apache.activemq.util.IntrospectionSupport.*;
/**
* Defines a query API for destinations MBeans
*
* Typical usage
*
* return DestinationsViewFilter.create(filter)
* .setDestinations(broker.getQueueViews())
* .filter(page, pageSize);
*
* where 'filter' is JSON representation of the query, like
*
* {name: '77', filter:'nonEmpty', sortColumn:'queueSize', sortOrder:'desc'}
*
* This returns a JSON map, containing filtered map of MBeans in the "data" field and total number of destinations that match criteria in the "count" field.
* The result will be properly paged, according to 'page' and 'pageSize' parameters.
*
*/
public class DestinationsViewFilter implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(DestinationsViewFilter.class);
private static final long serialVersionUID = 1L;
/**
* Name pattern used to filter destinations
*/
String name;
/**
* Arbitrary filter key to be applied to the destinations. Currently only simple predefined filters has been implemented:
*
* empty - return only empty queues (queueSize = 0)
* nonEmpty - return only non-empty queues queueSize != 0)
* noConsumer - return only destinations that doesn't have consumers
* nonAdvisory - return only non-Advisory topics
*
* For more implementation details see {@link DestinationsViewFilter.getPredicate}
*
*/
String filter;
/**
* Sort destinations by this {@link DestinationView} property
*/
String sortColumn = "name";
/**
* Order of sorting - 'asc' or 'desc'
*/
String sortOrder = "asc";
Map<ObjectName, DestinationView> destinations;
public DestinationsViewFilter() {
}
/**
* Creates an object from the JSON string
*
*/
public static DestinationsViewFilter create(String json) throws IOException {
ObjectMapper mapper = new ObjectMapper();
if (json == null) {
return new DestinationsViewFilter();
}
json = json.trim();
if (json.length() == 0 || json.equals("{}")) {
return new DestinationsViewFilter();
}
return mapper.readerFor(DestinationsViewFilter.class).readValue(json);
}
/**
* Destination MBeans to be queried
*/
public DestinationsViewFilter setDestinations(Map<ObjectName, DestinationView> destinations) {
this.destinations = destinations;
return this;
}
/**
* Filter, sort and page results.
*
* Returns JSON map with resulting destination views and total number of matched destinations
*
* @param page - defines result page to be returned
* @param pageSize - defines page size to be used
* @throws IOException
*/
String filter(int page, int pageSize) throws IOException {
ObjectMapper mapper = new ObjectMapper();
Map<ObjectName, DestinationView> filtered = getFilteredDestinations(page, pageSize);
Map<String, Object> result = new HashMap<String, Object>();
result.put("data", filtered);
result.put("count", destinations.size());
StringWriter writer = new StringWriter();
mapper.writeValue(writer, result);
return writer.toString();
}
Map<ObjectName, DestinationView> getFilteredDestinations(int page, int pageSize) {
Map<ObjectName, DestinationView> filtered = Maps.filterValues(destinations, getPredicate());
ImmutableMap.Builder<ObjectName, DestinationView> builder = ImmutableMap.builder();
int start = (page - 1) * pageSize;
int end = Math.min(page * pageSize, filtered.size());
int i = 0;
for (Map.Entry<ObjectName, DestinationView> entry :
getOrdering().sortedCopy(filtered.entrySet())) {
if (i >= start && i < end) {
builder.put(entry.getKey(), entry.getValue());
}
i++;
}
return builder.build();
}
Predicate<DestinationView> getPredicate() {
return new Predicate<DestinationView>() {
@Override
public boolean apply(DestinationView input) {
boolean match = true;
if (getName() != null && !getName().isEmpty()) {
match = input.getName().contains(getName());
}
if (match) {
if (getFilter().equals("empty")) {
match = input.getQueueSize() == 0;
}
if (getFilter().equals("nonEmpty")) {
match = input.getQueueSize() != 0;
}
if (getFilter().equals("noConsumer")) {
match = input.getConsumerCount() == 0;
}
if (getFilter().equals("nonAdvisory")) {
return !(input instanceof TopicView && AdvisorySupport.isAdvisoryTopic(new ActiveMQTopic(input.getName())));
}
}
return match;
}
};
}
Ordering<Map.Entry<ObjectName, DestinationView>> getOrdering() {
return new Ordering<Map.Entry<ObjectName, DestinationView>>() {
Method getter = findGetterMethod(DestinationView.class, getSortColumn());
@Override
public int compare(Map.Entry<ObjectName, DestinationView> left, Map.Entry<ObjectName, DestinationView> right) {
try {
if (getter != null) {
Object leftValue = getter.invoke(left.getValue());
Object rightValue = getter.invoke(right.getValue());
if (leftValue instanceof Comparable && rightValue instanceof Comparable) {
if (getSortOrder().toLowerCase().equals("desc")) {
return ((Comparable) rightValue).compareTo(leftValue);
} else {
return ((Comparable) leftValue).compareTo(rightValue);
}
}
}
return 0;
} catch (Exception e) {
LOG.info("Exception sorting destinations", e);
return 0;
}
}
};
}
public Map<ObjectName, DestinationView> getDestinations() {
return destinations;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getFilter() {
return filter;
}
public void setFilter(String filter) {
this.filter = filter;
}
public String getSortOrder() {
return sortOrder;
}
public void setSortOrder(String sortOrder) {
this.sortOrder = sortOrder;
}
public String getSortColumn() {
return sortColumn;
}
public void setSortColumn(String sortColumn) {
this.sortColumn = sortColumn;
}
}

View File

@ -828,4 +828,13 @@ public class ManagedRegionBroker extends RegionBroker {
public Map<ObjectName, DestinationView> getQueueViews() {
return queues;
}
public Map<ObjectName, DestinationView> getTopicViews() {
return topics;
}
public DestinationView getQueueView(String queueName) throws MalformedObjectNameException {
ObjectName objName = BrokerMBeanSupport.createDestinationName(brokerObjectName.toString(), "Queue", queueName);
return queues.get(objName);
}
}

View File

@ -255,7 +255,7 @@ public final class IntrospectionSupport {
}
}
private static Method findSetterMethod(Class clazz, String name) {
public static Method findSetterMethod(Class clazz, String name) {
// Build the method name.
name = "set" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
Method[] methods = clazz.getMethods();
@ -268,6 +268,19 @@ public final class IntrospectionSupport {
return null;
}
public static Method findGetterMethod(Class clazz, String name) {
// Build the method name.
name = "get" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
Method[] methods = clazz.getMethods();
for (Method method : methods) {
Class<?> params[] = method.getParameterTypes();
if (method.getName().equals(name) && params.length == 0 ) {
return method;
}
}
return null;
}
public static String toString(Object target) {
return toString(target, Object.class, null);
}

View File

@ -340,6 +340,11 @@
<artifactId>derbynet</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava-version}</version>
</dependency>
<!-- copied dependencies from activemq-web-console -->
<!-- enable commons-logging when inside jetty6:run -->

View File

@ -63,6 +63,7 @@
<directory-version>2.0.0-M6</directory-version>
<ftpserver-version>1.0.6</ftpserver-version>
<geronimo-version>1.0</geronimo-version>
<guava-version>19.0</guava-version>
<hadoop-version>1.0.0</hadoop-version>
<hawtbuf-version>1.11</hawtbuf-version>
<hawtdispatch-version>1.22</hawtdispatch-version>