ARTEMIS-1349 Add management views for client objects

This commit is contained in:
Martyn Taylor 2017-08-14 16:39:55 +01:00
parent f2f4708eba
commit bb3965b7f7
35 changed files with 1541 additions and 12 deletions

View File

@ -1075,5 +1075,35 @@ public interface ActiveMQServerControl {
@Operation(desc = "List Addresses on the broker", impact = MBeanOperationInfo.INFO) @Operation(desc = "List Addresses on the broker", impact = MBeanOperationInfo.INFO)
String listAddresses(@Parameter(name = "separator", desc = "Separator used on the string listing") String separator) throws Exception; String listAddresses(@Parameter(name = "separator", desc = "Separator used on the string listing") String separator) throws Exception;
@Operation(desc = "Search for Connections", impact = MBeanOperationInfo.INFO)
String listConnections(@Parameter(name = "Options") String options,
@Parameter(name = "Page Number") int page,
@Parameter(name = "Page Size") int pageSize) throws Exception;
@Operation(desc = "Search for Sessions", impact = MBeanOperationInfo.INFO)
String listSessions(@Parameter(name = "Options") String options,
@Parameter(name = "Page Number") int page,
@Parameter(name = "Page Size") int pageSize) throws Exception;
@Operation(desc = "Search for Consumers", impact = MBeanOperationInfo.INFO)
String listConsumers(@Parameter(name = "Options") String options,
@Parameter(name = "Page Number") int page,
@Parameter(name = "Page Size") int pageSize) throws Exception;
@Operation(desc = "Search for Consumers", impact = MBeanOperationInfo.INFO)
String listProducers(@Parameter(name = "Options") String options,
@Parameter(name = "Page Number") int page,
@Parameter(name = "Page Size") int pageSize) throws Exception;
@Operation(desc = "Search for Addresses", impact = MBeanOperationInfo.INFO)
String listAddresses(@Parameter(name = "Options") String options,
@Parameter(name = "Page Number") int page,
@Parameter(name = "Page Size") int pageSize) throws Exception;
@Operation(desc = "Search for Queues", impact = MBeanOperationInfo.INFO)
String listQueues(@Parameter(name = "Options") String options,
@Parameter(name = "Page Number") int page,
@Parameter(name = "Page Size") int pageSize) throws Exception;
} }

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException; import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
@ -347,6 +348,16 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
return false; return false;
} }
/**
* Returns the name of the protocol for this Remoting Connection
*
* @return
*/
@Override
public String getProtocolName() {
return ActiveMQClient.DEFAULT_CORE_PROTOCOL;
}
// Buffer Handler implementation // Buffer Handler implementation
// ---------------------------------------------------- // ----------------------------------------------------
@Override @Override
@ -400,14 +411,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
} }
} }
public void setClientID(String cID) {
clientID = cID;
}
public String getClientID() {
return clientID;
}
@Override @Override
public void killMessage(SimpleString nodeID) { public void killMessage(SimpleString nodeID) {
if (clientVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) { if (clientVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) {

View File

@ -44,6 +44,7 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
protected final Executor executor; protected final Executor executor;
protected final long creationTime; protected final long creationTime;
protected volatile boolean dataReceived; protected volatile boolean dataReceived;
private String clientId;
public AbstractRemotingConnection(final Connection transportConnection, final Executor executor) { public AbstractRemotingConnection(final Connection transportConnection, final Executor executor) {
this.transportConnection = transportConnection; this.transportConnection = transportConnection;
@ -226,4 +227,14 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
public Subject getSubject() { public Subject getSubject() {
return null; return null;
} }
@Override
public void setClientID(String clientId) {
this.clientId = clientId;
}
@Override
public String getClientID() {
return clientId;
}
} }

View File

@ -214,4 +214,22 @@ public interface RemotingConnection extends BufferHandler {
* @return * @return
*/ */
Subject getSubject(); Subject getSubject();
/**
* Returns the name of the protocol for this Remoting Connection
* @return
*/
String getProtocolName();
/**
* Sets the client ID associated with this connection
* @return
*/
void setClientID(String cID);
/**
* Returns the Client ID associated with this connection
* @return
*/
String getClientID();
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.utils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -30,7 +31,7 @@ public class StringUtil {
* @param delimit the delimiter used to separate each string entry in the list * @param delimit the delimiter used to separate each string entry in the list
* @return the converted string * @return the converted string
*/ */
public static String joinStringList(List<String> strList, String delimit) { public static String joinStringList(Collection<String> strList, String delimit) {
Iterator<String> entries = strList.iterator(); Iterator<String> entries = strList.iterator();
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();

View File

@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
@ -673,4 +674,12 @@ public class AMQPSessionCallback implements SessionCallback {
public void invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) { public void invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
protonSPI.invokeOutgoingInterceptors(message, connection); protonSPI.invokeOutgoingInterceptors(message, connection);
} }
public void addProducer(ServerProducer serverProducer) {
serverSession.addProducer(serverProducer);
}
public void removeProducer(String name) {
serverSession.removeProducer(name);
}
} }

View File

@ -160,4 +160,19 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
} }
return null; return null;
} }
/**
* Returns the name of the protocol for this Remoting Connection
*
* @return
*/
@Override
public String getProtocolName() {
return ProtonProtocolManagerFactory.AMQP_PROTOCOL_NAME;
}
@Override
public String getClientID() {
return amqpConnection.getContainer();
}
} }

View File

@ -34,7 +34,7 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
public static final byte ID = 2; public static final byte ID = 2;
private static final String AMQP_PROTOCOL_NAME = "AMQP"; public static final String AMQP_PROTOCOL_NAME = "AMQP";
private static final String MODULE_NAME = "artemis-amqp-protocol"; private static final String MODULE_NAME = "artemis-amqp-protocol";

View File

@ -21,6 +21,8 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.impl.ServerProducerImpl;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@ -138,6 +140,7 @@ public class AMQPSessionContext extends ProtonInitializable {
} }
public void removeReceiver(Receiver receiver) { public void removeReceiver(Receiver receiver) {
sessionSPI.removeProducer(receiver.getName());
receivers.remove(receiver); receivers.remove(receiver);
} }
@ -200,6 +203,8 @@ public class AMQPSessionContext extends ProtonInitializable {
ProtonServerReceiverContext protonReceiver = new ProtonServerReceiverContext(sessionSPI, connection, this, receiver); ProtonServerReceiverContext protonReceiver = new ProtonServerReceiverContext(sessionSPI, connection, this, receiver);
protonReceiver.initialise(); protonReceiver.initialise();
receivers.put(receiver, protonReceiver); receivers.put(receiver, protonReceiver);
ServerProducer serverProducer = new ServerProducerImpl(receiver.getName(), "AMQP", receiver.getTarget().getAddress());
sessionSPI.addProducer(serverProducer);
receiver.setContext(protonReceiver); receiver.setContext(protonReceiver);
connection.lock(); connection.lock();
try { try {

View File

@ -45,6 +45,8 @@ public class MQTTConnection implements RemotingConnection {
private boolean connected; private boolean connected;
private String clientID;
private final List<FailureListener> failureListeners = Collections.synchronizedList(new ArrayList<FailureListener>()); private final List<FailureListener> failureListeners = Collections.synchronizedList(new ArrayList<FailureListener>());
private final List<CloseListener> closeListeners = Collections.synchronizedList(new ArrayList<CloseListener>()); private final List<CloseListener> closeListeners = Collections.synchronizedList(new ArrayList<CloseListener>());
@ -233,4 +235,35 @@ public class MQTTConnection implements RemotingConnection {
public Subject getSubject() { public Subject getSubject() {
return null; return null;
} }
/**
* Returns the name of the protocol for this Remoting Connection
*
* @return
*/
@Override
public String getProtocolName() {
return MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME;
}
/**
* Sets the client ID associated with this connection
*
* @param cID
* @return
*/
@Override
public void setClientID(String cID) {
this.clientID = cID;
}
/**
* Returns the Client ID associated with this connection
*
* @return
*/
@Override
public String getClientID() {
return clientID;
}
} }

View File

@ -98,6 +98,7 @@ public class MQTTConnectionManager {
session.getConnection().setConnected(true); session.getConnection().setConnected(true);
session.start(); session.start();
session.getConnection().setClientID(clientId);
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED); session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED);
} }

View File

@ -68,6 +68,7 @@ public class MQTTPublishManager {
} }
synchronized void stop() throws Exception { synchronized void stop() throws Exception {
session.getServerSession().removeProducer(session.getServerSession().getName());
if (managementConsumer != null) { if (managementConsumer != null) {
managementConsumer.removeItself(); managementConsumer.removeItself();
managementConsumer.setStarted(false); managementConsumer.setStarted(false);

View File

@ -1609,4 +1609,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
//unsupported //unsupported
} }
@Override
public String getProtocolName() {
return OpenWireProtocolManagerFactory.OPENWIRE_PROTOCOL_NAME;
}
@Override
public String getClientID() {
return context.getClientId();
}
} }

View File

@ -421,10 +421,12 @@ public final class StompConnection implements RemotingConnection {
return passcode; return passcode;
} }
@Override
public void setClientID(String clientID) { public void setClientID(String clientID) {
this.clientID = clientID; this.clientID = clientID;
} }
@Override
public String getClientID() { public String getClientID() {
return clientID; return clientID;
} }
@ -786,4 +788,14 @@ public final class StompConnection implements RemotingConnection {
public Subject getSubject() { public Subject getSubject() {
return null; return null;
} }
/**
* Returns the name of the protocol for this Remoting Connection
*
* @return
*/
@Override
public String getProtocolName() {
return StompProtocolManagerFactory.STOMP_PROTOCOL_NAME;
}
} }

View File

@ -56,6 +56,7 @@ import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.BridgeControl; import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.DivertControl; import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.Parameter;
import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.client.impl.Topology; import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
@ -64,6 +65,12 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration; import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.management.impl.view.AddressView;
import org.apache.activemq.artemis.core.management.impl.view.ConnectionView;
import org.apache.activemq.artemis.core.management.impl.view.ConsumerView;
import org.apache.activemq.artemis.core.management.impl.view.ProducerView;
import org.apache.activemq.artemis.core.management.impl.view.QueueView;
import org.apache.activemq.artemis.core.management.impl.view.SessionView;
import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager; import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -86,6 +93,7 @@ import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
@ -1575,6 +1583,109 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
return producers.build().toString(); return producers.build().toString();
} }
@Override
public String listConnections(String options, int page, int pageSize) throws Exception {
checkStarted();
clearIO();
try {
server.getPostOffice().getAddresses();
ConnectionView view = new ConnectionView(server);
view.setCollection(server.getRemotingService().getConnections());
view.setOptions(options);
return view.getResultsAsJson(page, pageSize);
} finally {
blockOnIO();
}
}
@Override
public String listSessions(String options, int page, int pageSize) throws Exception {
checkStarted();
clearIO();
try {
SessionView view = new SessionView();
view.setCollection(server.getSessions());
view.setOptions(options);
return view.getResultsAsJson(page, pageSize);
} finally {
blockOnIO();
}
}
@Override
public String listConsumers(String options, int page, int pageSize) throws Exception {
checkStarted();
clearIO();
try {
Set<ServerConsumer> consumers = new HashSet();
for (ServerSession session : server.getSessions()) {
consumers.addAll(session.getServerConsumers());
}
ConsumerView view = new ConsumerView(server);
view.setCollection(consumers);
view.setOptions(options);
return view.getResultsAsJson(page, pageSize);
} finally {
blockOnIO();
}
}
@Override
public String listAddresses(String options, int page, int pageSize) throws Exception {
checkStarted();
clearIO();
try {
final Set<SimpleString> addresses = server.getPostOffice().getAddresses();
AddressView view = new AddressView(server);
view.setCollection(addresses);
view.setOptions(options);
return view.getResultsAsJson(page, pageSize);
} finally {
blockOnIO();
}
}
@Override
public String listQueues(String options, int page, int pageSize) throws Exception {
checkStarted();
clearIO();
try {
List<QueueControl> queues = new ArrayList<>();
Object[] qs = server.getManagementService().getResources(QueueControl.class);
for (int i = 0; i < qs.length; i++) {
queues.add((QueueControl) qs[i]);
}
QueueView view = new QueueView(server);
view.setCollection(queues);
view.setOptions(options);
return view.getResultsAsJson(page, pageSize);
} finally {
blockOnIO();
}
}
@Override
public String listProducers(@Parameter(name = "Options") String options,
@Parameter(name = "Page Number") int page,
@Parameter(name = "Page Size") int pageSize) throws Exception {
checkStarted();
clearIO();
try {
Set<ServerProducer> producers = new HashSet<>();
for (ServerSession session : server.getSessions()) {
producers.addAll(session.getServerProducers().values());
}
ProducerView view = new ProducerView(server);
view.setCollection(producers);
view.setOptions(options);
return view.getResultsAsJson(page, pageSize);
} finally {
blockOnIO();
}
}
@Override @Override
public String listConnectionsAsJSON() throws Exception { public String listConnectionsAsJSON() throws Exception {
checkStarted(); checkStarted();

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.view;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.List;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.core.management.impl.view.predicate.ActiveMQFilterPredicate;
import org.apache.activemq.artemis.utils.JsonLoader;
public abstract class ActiveMQAbstractView<T> {
private static final String FILTER_FIELD = "field";
private static final String FILTER_OPERATION = "operation";
private static final String FILTER_VALUE = "value";
protected Collection<T> collection;
protected ActiveMQFilterPredicate<T> predicate;
protected String sortColumn;
protected String sortOrder;
protected String options;
private Method getter;
public ActiveMQAbstractView() {
this.sortColumn = getDefaultOrderColumn();
this.sortOrder = "asc";
}
public void setCollection(Collection<T> collection) {
this.collection = collection;
}
public String getResultsAsJson(int page, int pageSize) {
JsonObjectBuilder obj = JsonLoader.createObjectBuilder();
JsonArrayBuilder array = JsonLoader.createArrayBuilder();
collection = Collections2.filter(collection, getPredicate());
for (T element : getPagedResult(page, pageSize)) {
array.add(toJson(element));
}
obj.add("data", array);
obj.add("count", collection.size());
return obj.build().toString();
}
public List<T> getPagedResult(int page, int pageSize) {
ImmutableList.Builder<T> builder = ImmutableList.builder();
int start = (page - 1) * pageSize;
int end = Math.min(page * pageSize, collection.size());
int i = 0;
for (T e : getOrdering().sortedCopy(collection)) {
if (i >= start && i < end) {
builder.add(e);
}
i++;
}
return builder.build();
}
public Predicate getPredicate() {
return predicate;
}
private Method getGetter() {
if (getter == null) {
getter = findGetterMethod(getClassT(), sortColumn);
}
return getter;
}
public Ordering<T> getOrdering() {
return new Ordering<T>() {
@Override
public int compare(T left, T right) {
Method getter = getGetter();
try {
if (getter != null) {
Object leftValue = getter.invoke(left);
Object rightValue = getter.invoke(right);
if (leftValue instanceof Comparable && rightValue instanceof Comparable) {
if (sortOrder.equals("desc")) {
return ((Comparable) rightValue).compareTo(leftValue);
} else {
return ((Comparable) leftValue).compareTo(rightValue);
}
}
}
return 0;
} catch (Exception e) {
//LOG.info("Exception sorting destinations", e);
return 0;
}
}
};
}
public static Method findGetterMethod(Class clazz, String sortColumn) {
String name = "get" + Character.toUpperCase(sortColumn.charAt(0)) + sortColumn.substring(1);
Method[] methods = clazz.getMethods();
for (Method method : methods) {
Class<?>[] params = method.getParameterTypes();
if (method.getName().equals(name) && params.length == 0) {
return method;
}
}
return null;
}
public void setOptions(String options) {
JsonObject json = JsonUtil.readJsonObject(options);
if (predicate != null) {
predicate.setField(json.getString(FILTER_FIELD));
predicate.setOperation(json.getString(FILTER_OPERATION));
predicate.setValue(json.getString(FILTER_VALUE));
}
}
public abstract Class getClassT();
public abstract JsonObjectBuilder toJson(T obj);
public abstract String getDefaultOrderColumn();
/**
* JsonObjectBuilder will throw an NPE if a null value is added. For this reason we check for null explicitly when
* adding objects.
*
* @param o
* @return
*/
protected String toString(Object o) {
return o == null ? "" : o.toString();
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.view;
import javax.json.JsonObjectBuilder;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.management.impl.view.predicate.AddressFilterPredicate;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.utils.JsonLoader;
public class AddressView extends ActiveMQAbstractView<SimpleString> {
private static final String defaultSortColumn = "creationTime";
private final ActiveMQServer server;
public AddressView(ActiveMQServer server) {
super();
this.server = server;
this.predicate = new AddressFilterPredicate(server);
}
@Override
public Class getClassT() {
return AddressInfo.class;
}
@Override
public JsonObjectBuilder toJson(SimpleString addressName) {
AddressInfo address = server.getAddressInfo(addressName);
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("id", toString(address.getId())).add("name", toString(address.getName())).add("routingTypes", toString(address.getRoutingTypes()));
try {
obj.add("queueCount", toString(server.bindingQuery(address.getName()).getQueueNames().size()));
return obj;
} catch (Exception e) {
obj.add("queueCount", 0);
}
return obj;
}
@Override
public String getDefaultOrderColumn() {
return defaultSortColumn;
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.view;
import javax.json.JsonObjectBuilder;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.activemq.artemis.core.management.impl.view.predicate.ConnectionFilterPredicate;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.StringUtil;
public class ConnectionView extends ActiveMQAbstractView<RemotingConnection> {
private static final String defaultSortColumn = "creationTime";
private final ActiveMQServer server;
public ConnectionView(ActiveMQServer server) {
super();
this.server = server;
this.predicate = new ConnectionFilterPredicate(server);
}
@Override
public Class getClassT() {
return RemotingConnection.class;
}
@Override
public JsonObjectBuilder toJson(RemotingConnection connection) {
List<ServerSession> sessions = server.getSessions(connection.getID().toString());
Set<String> users = new HashSet<>();
for (ServerSession session : sessions) {
String username = session.getUsername() == null ? "" : session.getUsername();
users.add(username);
}
return JsonLoader.createObjectBuilder().add("connectionID", toString(connection.getID())).add("remoteAddress", toString(connection.getRemoteAddress())).add("users", StringUtil.joinStringList(users, ",")).add("creationTime", new Date(connection.getCreationTime()).toString()).add("implementation", toString(toString(connection.getClass().getSimpleName()))).add("protocol", toString(connection.getProtocolName())).add("clientID", toString(connection.getClientID())).add("localAddress", toString(connection.getTransportConnection().getLocalAddress())).add("sessionCount", server.getSessions(connection.getID().toString()).size());
}
@Override
public String getDefaultOrderColumn() {
return defaultSortColumn;
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.view;
import javax.json.JsonObjectBuilder;
import java.util.Date;
import org.apache.activemq.artemis.core.management.impl.view.predicate.ConsumerFilterPredicate;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.utils.JsonLoader;
public class ConsumerView extends ActiveMQAbstractView<ServerConsumer> {
private static final String defaultSortColumn = "creationTime";
private final ActiveMQServer server;
public ConsumerView(ActiveMQServer server) {
super();
this.server = server;
this.predicate = new ConsumerFilterPredicate(server);
}
@Override
public Class getClassT() {
return ServerConsumer.class;
}
@Override
public JsonObjectBuilder toJson(ServerConsumer consumer) {
ServerSession session = server.getSessionByID(consumer.getSessionID());
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("id", toString(consumer.sequentialID())).add("session", toString(session.getName())).add("clientID", toString(session.getRemotingConnection().getClientID())).add("user", toString(session.getUsername())).add("protocol", toString(session.getRemotingConnection().getProtocolName())).add("queue", toString(consumer.getQueue().getName())).add("queueType", toString(consumer.getQueue().getRoutingType()).toLowerCase()).add("address", toString(consumer.getQueue().getAddress().toString())).add("localAddress", toString(session.getRemotingConnection().getTransportConnection().getLocalAddress())).add("remoteAddress", toString(session.getRemotingConnection().getTransportConnection().getRemoteAddress())).add("creationTime", new Date(consumer.getCreationTime()).toString());
return obj;
}
@Override
public String getDefaultOrderColumn() {
return defaultSortColumn;
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.view;
import javax.json.JsonObjectBuilder;
import org.apache.activemq.artemis.core.management.impl.view.predicate.ProducerFilterPredicate;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.utils.JsonLoader;
public class ProducerView extends ActiveMQAbstractView<ServerProducer> {
private static final String defaultSortColumn = "creationTime";
private final ActiveMQServer server;
public ProducerView(ActiveMQServer server) {
super();
this.server = server;
this.predicate = new ProducerFilterPredicate(server);
}
@Override
public Class getClassT() {
return ServerProducer.class;
}
@Override
public JsonObjectBuilder toJson(ServerProducer producer) {
ServerSession session = server.getSessionByID(producer.getSessionID());
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("id", toString(producer.getID())).add("session", toString(session.getName())).add("clientID", toString(session.getRemotingConnection().getClientID())).add("user", toString(session.getUsername())).add("protocol", toString(session.getRemotingConnection().getProtocolName())).add("address", toString(producer.getAddress() != null ? producer.getAddress() : session.getDefaultAddress())).add("localAddress", toString(session.getRemotingConnection().getTransportConnection().getLocalAddress())).add("remoteAddress", toString(session.getRemotingConnection().getTransportConnection().getRemoteAddress())).add("creationTime", toString(producer.getCreationTime()));
return obj;
}
@Override
public String getDefaultOrderColumn() {
return defaultSortColumn;
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.view;
import javax.json.JsonObjectBuilder;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.management.impl.view.predicate.QueueFilterPredicate;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.utils.JsonLoader;
public class QueueView extends ActiveMQAbstractView<QueueControl> {
private static final String defaultSortColumn = "name";
private ActiveMQServer server;
public QueueView(ActiveMQServer server) {
super();
this.predicate = new QueueFilterPredicate(server);
this.server = server;
}
@Override
public Class getClassT() {
return QueueControl.class;
}
@Override
public JsonObjectBuilder toJson(QueueControl queue) {
Queue q = server.locateQueue(new SimpleString(queue.getName()));
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("id", toString(queue.getID())).add("name", toString(queue.getName())).add("address", toString(queue.getAddress())).add("filter", toString(queue.getFilter())).add("rate", toString(q.getRate())).add("durable", toString(queue.isDurable())).add("paused", toString(q.isPaused())).add("temporary", toString(queue.isTemporary())).add("purgeOnNoConsumers", toString(queue.isPurgeOnNoConsumers())).add("consumerCount", toString(queue.getConsumerCount())).add("maxConsumers", toString(queue.getMaxConsumers())).add("autoCreated", toString(q.isAutoCreated())).add("user", toString(q.getUser())).add("routingType", toString(queue.getRoutingType())).add("messagesAdded", toString(queue.getMessagesAdded())).add("messageCount", toString(queue.getMessageCount())).add("messagesAcked", toString(queue.getMessagesAcknowledged())).add("deliveringCount", toString(queue.getDeliveringCount())).add("messagesKilled", toString(queue.getMessagesKilled())).add("deliverDeliver", toString(q.isDirectDeliver()));
return obj;
}
@Override
public String getDefaultOrderColumn() {
return defaultSortColumn;
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.view;
import javax.json.JsonObjectBuilder;
import java.util.Date;
import org.apache.activemq.artemis.core.management.impl.view.predicate.SessionFilterPredicate;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.utils.JsonLoader;
public class SessionView extends ActiveMQAbstractView<ServerSession> {
private static final String defaultSortColumn = "creationTime";
public SessionView() {
super();
this.predicate = new SessionFilterPredicate();
}
@Override
public Class getClassT() {
return ServerSession.class;
}
@Override
public JsonObjectBuilder toJson(ServerSession session) {
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("id", toString(session.getName())).add("user", toString(session.getUsername())).add("creationTime", new Date(session.getCreationTime()).toString()).add("consumerCount", session.getServerConsumers().size()).add("producerCount", session.getServerProducers().size()).add("connectionID", session.getConnectionID().toString());
return obj;
}
@Override
public String getDefaultOrderColumn() {
return defaultSortColumn;
}
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.view.predicate;
import java.util.Collection;
import com.google.common.base.Predicate;
public class ActiveMQFilterPredicate<T> implements Predicate<T> {
enum Operation {
CONTAINS, EQUALS;
}
protected String field;
protected String value;
protected Operation operation;
public static boolean contains(String field, String value) {
return field.contains(value);
}
public ActiveMQFilterPredicate() {
}
@Override
public boolean apply(T input) {
return true;
}
public String getField() {
return field;
}
public void setField(String field) {
this.field = field;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public Operation getOperation() {
return operation;
}
public void setOperation(String operation) {
if (operation != null && !operation.equals("")) {
this.operation = Operation.valueOf(operation);
}
}
public boolean matches(Object field) {
if (operation != null) {
switch (operation) {
case EQUALS:
return equals(field, value);
case CONTAINS:
return contains(field, value);
}
}
return true;
}
public boolean matchAny(Collection objects) {
for (Object o : objects) {
if (matches(o))
return true;
}
return false;
}
private boolean equals(Object field, Object value) {
if (field == null) {
return (value.equals("") || value == null);
}
return field.toString().equals(value);
}
private boolean contains(Object field, Object value) {
if (field == null) {
return (value.equals("") || value == null);
}
return field.toString().contains(value.toString());
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.view.predicate;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
public class AddressFilterPredicate extends ActiveMQFilterPredicate<SimpleString> {
enum Field {
ID, NAME, ROUTING_TYPES, PRODUCER_ID, QUEUE_COUNT
}
private Field f;
private final ActiveMQServer server;
public AddressFilterPredicate(ActiveMQServer server) {
super();
this.server = server;
}
@Override
public boolean apply(SimpleString addressName) {
AddressInfo address = server.getAddressInfo(addressName);
if (f == null)
return true;
try {
switch (f) {
case ID:
return matches(address.getId());
case NAME:
return matches(address.getName());
case ROUTING_TYPES:
return matchAny(address.getRoutingTypes());
case PRODUCER_ID:
return matches("TODO");
case QUEUE_COUNT:
return matches(server.bindingQuery(address.getName()).getQueueNames().size());
}
} catch (Exception e) {
return false;
}
return true;
}
@Override
public void setField(String field) {
if (field != null && !field.equals("")) {
this.f = Field.valueOf(field.toUpperCase());
}
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.view.predicate;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public class ConnectionFilterPredicate extends ActiveMQFilterPredicate<RemotingConnection> {
enum Field {
CONNECTION_ID, CLIENT_ID, USERS, PROTOCOL, SESSION_COUNT, REMOTE_ADDRESS, LOCAL_ADDRESS, SESSION_ID
}
private Field f;
private ActiveMQServer server;
public ConnectionFilterPredicate(ActiveMQServer server) {
this.server = server;
}
@Override
public boolean apply(RemotingConnection connection) {
// Using switch over enum vs string comparison is better for perf.
if (f == null)
return true;
switch (f) {
case CONNECTION_ID:
return matches(connection.getID());
case CLIENT_ID:
return matches(connection.getClientID());
case USERS:
List<ServerSession> sessions = server.getSessions(connection.getID().toString());
Set<String> users = new HashSet<>();
for (ServerSession session : sessions) {
String username = session.getUsername() == null ? "" : session.getUsername();
users.add(username);
}
return matchAny(users);
case PROTOCOL:
return matches(connection.getProtocolName());
case SESSION_COUNT:
return matches(server.getSessions(connection.getID().toString()).size());
case REMOTE_ADDRESS:
return matches(connection.getTransportConnection().getRemoteAddress());
case LOCAL_ADDRESS:
return matches(connection.getTransportConnection().getLocalAddress());
case SESSION_ID:
return matchAny(server.getSessions(connection.getID().toString()));
}
return true;
}
@Override
public void setField(String field) {
if (field != null && !field.equals("")) {
this.f = Field.valueOf(field.toUpperCase());
}
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.view.predicate;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerConsumer;
public class ConsumerFilterPredicate extends ActiveMQFilterPredicate<ServerConsumer> {
enum Field {
ID, SESSION_ID, QUEUE, ADDRESS, USER, PROTOCOL, CLIENT_ID, LOCAL_ADDRESS, REMOTE_ADDRESS
}
private Field f;
private final ActiveMQServer server;
public ConsumerFilterPredicate(ActiveMQServer server) {
super();
this.server = server;
}
@Override
public boolean apply(ServerConsumer consumer) {
// Using switch over enum vs string comparison is better for perf.
if (f == null)
return true;
switch (f) {
case ID:
return matches(consumer.getID());
case SESSION_ID:
return matches(consumer.getSessionID());
case USER:
return matches(server.getSessionByID(consumer.getSessionID()).getUsername());
case ADDRESS:
return matches(consumer.getQueue().getAddress());
case QUEUE:
return matches(consumer.getQueue().getName());
case PROTOCOL:
return matches(server.getSessionByID(consumer.getSessionID()).getRemotingConnection().getProtocolName());
case CLIENT_ID:
return matches(server.getSessionByID(consumer.getSessionID()).getRemotingConnection().getClientID());
case LOCAL_ADDRESS:
return matches(server.getSessionByID(consumer.getSessionID()).getRemotingConnection().getTransportConnection().getLocalAddress());
case REMOTE_ADDRESS:
return matches(server.getSessionByID(consumer.getSessionID()).getRemotingConnection().getTransportConnection().getRemoteAddress());
}
return true;
}
@Override
public void setField(String field) {
if (field != null && !field.equals("")) {
this.f = Field.valueOf(field.toUpperCase());
}
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.view.predicate;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerProducer;
public class ProducerFilterPredicate extends ActiveMQFilterPredicate<ServerProducer> {
enum Field {
ID, SESSION_ID, CONNECTION_ID, ADDRESS, USER, PROTOCOL, CLIENT_ID, LOCAL_ADDRESS, REMOTE_ADDRESS
}
private Field f;
private final ActiveMQServer server;
public ProducerFilterPredicate(ActiveMQServer server) {
super();
this.server = server;
}
@Override
public boolean apply(ServerProducer producer) {
// Using switch over enum vs string comparison is better for perf.
if (f == null)
return true;
switch (f) {
case ID:
return matches(producer.getID());
case CONNECTION_ID:
return matches(producer.getConnectionID());
case SESSION_ID:
return matches(producer.getSessionID());
case USER:
return matches(server.getSessionByID(producer.getSessionID()).getUsername());
case ADDRESS:
return matches(producer.getAddress() != null ? producer.getAddress() : server.getSessionByID(producer.getSessionID()).getDefaultAddress());
case PROTOCOL:
return matches(producer.getProtocol());
case CLIENT_ID:
return matches(server.getSessionByID(producer.getSessionID()).getRemotingConnection().getClientID());
case LOCAL_ADDRESS:
return matches(server.getSessionByID(producer.getSessionID()).getRemotingConnection().getTransportConnection().getLocalAddress());
case REMOTE_ADDRESS:
return matches(server.getSessionByID(producer.getSessionID()).getRemotingConnection().getTransportConnection().getRemoteAddress());
default: return true;
}
}
@Override
public void setField(String field) {
if (field != null && !field.equals("")) {
this.f = Field.valueOf(field.toUpperCase());
}
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.view.predicate;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.Queue;
public class QueueFilterPredicate extends ActiveMQFilterPredicate<QueueControl> {
enum Field {
ID, NAME, CONSUMER_ID, QUEUE, ADDRESS, MAX_CONSUMERS, FILTER, MESSAGE_COUNT, CONSUMER_COUNT, DELIVERING_COUNT,
MESSAGES_ADDED, MESSAGES_ACKED, RATE, ROUTING_TYPE, USER, AUTO_CREATED, DURABLE, PAUSED, TEMPORARY,
PURGE_ON_NO_CONSUMERS, MESSAGES_KILLED, DIRECT_DELIVER
}
private Field f;
private ActiveMQServer server;
public QueueFilterPredicate(ActiveMQServer server) {
super();
this.server = server;
}
@Override
public boolean apply(QueueControl queue) {
// Using switch over enum vs string comparison is better for perf.
try {
if (f == null)
return true;
switch (f) {
case ID:
return matches(queue.getID());
case NAME:
return matches(queue.getName());
case CONSUMER_ID:
Queue q = server.locateQueue(new SimpleString(queue.getName()));
for (Consumer consumer : q.getConsumers()) {
if (value.equals(consumer.sequentialID()))
return true;
}
return false;
case MAX_CONSUMERS:
return matches(queue.getMaxConsumers());
case ADDRESS:
return matches(queue.getAddress());
case FILTER:
return matches(queue.getFilter());
case MESSAGE_COUNT:
return matches(queue.getMessageCount());
case CONSUMER_COUNT:
return matches(queue.getConsumerCount());
case DELIVERING_COUNT:
return matches(queue.getDeliveringCount());
case MESSAGES_ADDED:
return matches(queue.getMessagesAdded());
case MESSAGES_ACKED:
return matches(queue.getMessagesAcknowledged());
case RATE:
return matches(queue.getMessagesExpired());
case ROUTING_TYPE:
return matches(queue.getRoutingType());
case AUTO_CREATED:
return matches(server.locateQueue(new SimpleString(queue.getName())).isAutoCreated());
case DURABLE:
return matches(queue.isDurable());
case PAUSED:
return matches(queue.isPaused());
case TEMPORARY:
return matches(queue.isTemporary());
case PURGE_ON_NO_CONSUMERS:
return matches(queue.isPurgeOnNoConsumers());
case MESSAGES_KILLED:
return matches(queue.getMessagesKilled());
default:
return true;
}
} catch (Exception e) {
return true;
}
}
@Override
public void setField(String field) {
if (field != null && !field.equals("")) {
this.f = Field.valueOf(field.toUpperCase());
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.view.predicate;
import org.apache.activemq.artemis.core.server.ServerSession;
public class SessionFilterPredicate extends ActiveMQFilterPredicate<ServerSession> {
enum Field {
ID, CONNECTION_ID, CONSUMER_COUNT, PRODUCER_COUNT, USER, PROTOCOL, CLIENT_ID, LOCAL_ADDRESS, REMOTE_ADDRESS
}
private Field f;
public SessionFilterPredicate() {
super();
}
@Override
public boolean apply(ServerSession session) {
// Using switch over enum vs string comparison is better for perf.
if (f == null)
return true;
switch (f) {
case ID:
return matches(session.getName());
case CONNECTION_ID:
return matches(session.getConnectionID());
case CONSUMER_COUNT:
return matches(session.getServerConsumers().size());
case PRODUCER_COUNT:
return matches(session.getServerProducers().size());
case PROTOCOL:
return matches(session.getRemotingConnection().getProtocolName());
case CLIENT_ID:
return matches(session.getRemotingConnection().getClientID());
case LOCAL_ADDRESS:
return matches(session.getRemotingConnection().getTransportConnection().getLocalAddress());
case REMOTE_ADDRESS:
return matches(session.getRemotingConnection().getTransportConnection().getRemoteAddress());
default: return true;
}
}
@Override
public void setField(String field) {
if (field != null && !field.equals("")) {
this.f = Field.valueOf(field.toUpperCase());
}
}
}

View File

@ -44,7 +44,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerProducerImpl;
import org.apache.activemq.artemis.core.version.Version; import org.apache.activemq.artemis.core.version.Version;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -168,9 +170,9 @@ public class ActiveMQPacketHandler implements ChannelHandler {
} }
CoreSessionCallback sessionCallback = new CoreSessionCallback(request.getName(), protocolManager, channel, connection); CoreSessionCallback sessionCallback = new CoreSessionCallback(request.getName(), protocolManager, channel, connection);
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap); ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap);
ServerProducer serverProducer = new ServerProducerImpl(session.getName(), "CORE", request.getDefaultAddress());
session.addProducer(serverProducer);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, protocolManager, session, server.getStorageManager(), channel); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, protocolManager, session, server.getStorageManager(), channel);
channel.setHandler(handler); channel.setHandler(handler);
sessionCallback.setSessionHandler(handler); sessionCallback.setSessionHandler(handler);

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;
public interface ServerProducer {
String getAddress();
String getProtocol();
void setSessionID(String sessionID);
void setConnectionID(String connectionID);
String getSessionID();
String getConnectionID();
String getID();
long getCreationTime();
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server; package org.apache.activemq.artemis.core.server;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import javax.json.JsonArrayBuilder; import javax.json.JsonArrayBuilder;
@ -292,4 +293,12 @@ public interface ServerSession extends SecurityAuth {
*/ */
Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address, Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address,
Set<RoutingType> defaultRoutingTypes); Set<RoutingType> defaultRoutingTypes);
void addProducer(ServerProducer serverProducer);
void removeProducer(String ID);
Map<String, ServerProducer> getServerProducers();
String getDefaultAddress();
} }

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.core.server.ServerProducer;
public class ServerProducerImpl implements ServerProducer {
private final String ID;
private final String protocol;
private final long creationTime;
private final String address;
private String sessionID;
private String connectionID;
public ServerProducerImpl(String ID, String protocol, String address) {
this.ID = ID;
this.protocol = protocol;
this.address = address;
this.creationTime = System.currentTimeMillis();
}
@Override
public String getAddress() {
return address;
}
@Override
public String getProtocol() {
return protocol;
}
@Override
public void setSessionID(String sessionID) {
this.sessionID = sessionID;
}
@Override
public void setConnectionID(String connectionID) {
this.connectionID = connectionID;
}
@Override
public String getSessionID() {
return sessionID;
}
@Override
public String getConnectionID() {
return connectionID;
}
@Override
public String getID() {
return ID;
}
@Override
public long getCreationTime() {
return creationTime;
}
}

View File

@ -74,6 +74,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.TempQueueObserver; import org.apache.activemq.artemis.core.server.TempQueueObserver;
import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.ManagementService;
@ -126,6 +127,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
protected final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<>(); protected final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<>();
protected final Map<String, ServerProducer> producers = new ConcurrentHashMap<>();
protected Transaction tx; protected Transaction tx;
protected boolean xa; protected boolean xa;
@ -383,6 +386,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
} }
consumers.clear(); consumers.clear();
producers.clear();
if (closeables != null) { if (closeables != null) {
for (Closeable closeable : closeables) { for (Closeable closeable : closeables) {
@ -1729,4 +1733,26 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
} }
return new Pair<>(address, defaultRoutingTypes); return new Pair<>(address, defaultRoutingTypes);
} }
@Override
public void addProducer(ServerProducer serverProducer) {
serverProducer.setSessionID(getName());
serverProducer.setConnectionID(getConnectionID().toString());
producers.put(serverProducer.getID(), serverProducer);
}
@Override
public void removeProducer(String ID) {
producers.remove(ID);
}
@Override
public Map<String, ServerProducer> getServerProducers() {
return Collections.unmodifiableMap(new HashMap(producers));
}
@Override
public String getDefaultAddress() {
return defaultAddress != null ? defaultAddress.toString() : null;
}
} }

View File

@ -891,6 +891,47 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
public String listAddresses(@Parameter(name = "separator", desc = "Separator used on the string listing") String separator) throws Exception { public String listAddresses(@Parameter(name = "separator", desc = "Separator used on the string listing") String separator) throws Exception {
return (String) proxy.invokeOperation("listAddresses", separator); return (String) proxy.invokeOperation("listAddresses", separator);
} }
@Override
public String listConnections(String filter, int page, int pageSize) throws Exception {
return (String) proxy.invokeOperation("listAddresses", filter, page, pageSize);
}
@Override
public String listSessions(@Parameter(name = "Filter String") String filter,
@Parameter(name = "Page Number") int page,
@Parameter(name = "Page Size") int pageSize) throws Exception {
return null;
}
@Override
public String listConsumers(@Parameter(name = "Options") String options,
@Parameter(name = "Page Number") int page,
@Parameter(name = "Page Size") int pageSize) throws Exception {
return null;
}
@Override
public String listProducers(@Parameter(name = "Options") String options,
@Parameter(name = "Page Number") int page,
@Parameter(name = "Page Size") int pageSize) throws Exception {
return null;
}
@Override
public String listAddresses(@Parameter(name = "Options") String options,
@Parameter(name = "Page Number") int page,
@Parameter(name = "Page Size") int pageSize) throws Exception {
return null;
}
@Override
public String listQueues(@Parameter(name = "Options") String options,
@Parameter(name = "Page Number") int page,
@Parameter(name = "Page Size") int pageSize) throws Exception {
return null;
}
}; };
} }