diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 8c1cbe7188..a34168c9a8 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -34,6 +34,12 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.CoreNotificationType; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; +import org.apache.activemq.artemis.core.server.management.ManagementService; +import org.apache.activemq.artemis.core.server.management.Notification; +import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; @@ -84,7 +90,7 @@ import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.InetAddressUtil; import org.apache.activemq.util.LongSequenceGenerator; -public class OpenWireProtocolManager implements ProtocolManager +public class OpenWireProtocolManager implements ProtocolManager, NotificationListener { private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); private static final IdGenerator ID_GENERATOR = new IdGenerator(); @@ -121,6 +127,8 @@ public class OpenWireProtocolManager implements ProtocolManager private Map transactions = new ConcurrentHashMap(); + private Map sessionIdMap = new ConcurrentHashMap(); + public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; @@ -130,6 +138,11 @@ public class OpenWireProtocolManager implements ProtocolManager wireFactory.setCacheEnabled(false); brokerState = new BrokerState(); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); + ManagementService service = server.getManagementService(); + if (service != null) + { + service.addNotificationListener(this); + } } @@ -603,6 +616,7 @@ public class OpenWireProtocolManager implements ProtocolManager amqSession.initialize(); amqSession.setInternal(internal); sessions.put(ss.getSessionId(), amqSession); + sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId()); return amqSession; } @@ -783,4 +797,51 @@ public class OpenWireProtocolManager implements ProtocolManager { transactions.put(txId, amqSession); } + + //advisory support + @Override + public void onNotification(Notification notif) + { + try + { + if (notif.getType() instanceof CoreNotificationType) + { + CoreNotificationType type = (CoreNotificationType)notif.getType(); + switch (type) + { + case CONSUMER_SLOW: + fireSlowConsumer(notif); + break; + default: + break; + } + } + } + catch (Exception e) + { + ActiveMQServerLogger.LOGGER.error("Failed to send notification " + notif, e); + } + } + + private void fireSlowConsumer(Notification notif) throws Exception + { + SimpleString coreSessionId = notif.getProperties().getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME); + Long coreConsumerId = notif.getProperties().getLongProperty(ManagementHelper.HDR_CONSUMER_NAME); + SessionId sessionId = sessionIdMap.get(coreSessionId.toString()); + AMQSession session = sessions.get(sessionId); + AMQConsumer consumer = session.getConsumer(coreConsumerId); + ActiveMQDestination destination = consumer.getDestination(); + + if (!AdvisorySupport.isAdvisoryTopic(destination)) + { + ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination); + ConnectionId connId = sessionId.getParentId(); + AMQTransportConnectionState cc = (AMQTransportConnectionState)this.brokerConnectionStates.get(connId); + OpenWireConnection conn = cc.getConnection(); + ActiveMQMessage advisoryMessage = new ActiveMQMessage(); + advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString()); + + fireAdvisory(conn.getConext(), topic, advisoryMessage, consumer.getId()); + } + } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 523a9fbf0d..1292aee78f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -390,4 +390,14 @@ public class AMQConsumer implements BrowserListener session.getCoreSession().acknowledge(nativeId, lastMi.nativeId); } } + + public org.apache.activemq.command.ActiveMQDestination getDestination() + { + return actualDest; + } + + public ConsumerInfo getInfo() + { + return info; + } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 3b166f0669..4f951fe95c 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -506,6 +506,11 @@ public class AMQSession implements SessionCallback this.coreSession.close(false); } + public AMQConsumer getConsumer(Long coreConsumerId) + { + return consumers.get(coreConsumerId); + } + private class SendRetryTask implements Runnable { private ServerMessage coreMsg; diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java index dd34769b9f..e9d84ef281 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -22,110 +22,41 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.security.Provider; -import java.security.Security; import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import org.apache.activemq.ActiveMQConnectionMetaData; -import org.apache.activemq.ConfigurationException; import org.apache.activemq.Service; -import org.apache.activemq.advisory.AdvisoryBroker; import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper; -import org.apache.activemq.broker.cluster.ConnectionSplitBroker; -import org.apache.activemq.broker.jmx.AnnotatedMBean; -import org.apache.activemq.broker.jmx.BrokerMBeanSupport; import org.apache.activemq.broker.jmx.BrokerView; -import org.apache.activemq.broker.jmx.ConnectorView; -import org.apache.activemq.broker.jmx.ConnectorViewMBean; -import org.apache.activemq.broker.jmx.HealthView; -import org.apache.activemq.broker.jmx.HealthViewMBean; -import org.apache.activemq.broker.jmx.JmsConnectorView; -import org.apache.activemq.broker.jmx.JobSchedulerView; -import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; -import org.apache.activemq.broker.jmx.Log4JConfigView; -import org.apache.activemq.broker.jmx.ManagedRegionBroker; import org.apache.activemq.broker.jmx.ManagementContext; -import org.apache.activemq.broker.jmx.NetworkConnectorView; -import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; -import org.apache.activemq.broker.jmx.ProxyConnectorView; -import org.apache.activemq.broker.region.CompositeDestinationInterceptor; import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.DestinationFactory; -import org.apache.activemq.broker.region.DestinationFactoryImpl; import org.apache.activemq.broker.region.DestinationInterceptor; -import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.virtual.MirroredQueue; -import org.apache.activemq.broker.region.virtual.VirtualDestination; -import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; -import org.apache.activemq.broker.region.virtual.VirtualTopic; import org.apache.activemq.broker.scheduler.JobSchedulerStore; -import org.apache.activemq.broker.scheduler.SchedulerBroker; -import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.BrokerId; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.filter.DestinationFilter; -import org.apache.activemq.network.ConnectionFilter; -import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.jms.JmsConnector; -import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.proxy.ProxyConnector; import org.apache.activemq.security.MessageAuthorizationPolicy; -import org.apache.activemq.selector.SelectorParser; -import org.apache.activemq.store.JournaledStore; import org.apache.activemq.store.PListStore; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapterFactory; -import org.apache.activemq.store.memory.MemoryPersistenceAdapter; -import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.TaskRunnerFactory; -import org.apache.activemq.transport.TransportFactorySupport; import org.apache.activemq.transport.TransportServer; -import org.apache.activemq.transport.vm.VMTransportFactory; import org.apache.activemq.usage.SystemUsage; -import org.apache.activemq.util.BrokerSupport; -import org.apache.activemq.util.DefaultIOExceptionHandler; import org.apache.activemq.util.IOExceptionHandler; -import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.InetAddressUtil; import org.apache.activemq.util.ServiceStopper; -import org.apache.activemq.util.ThreadPoolUtils; -import org.apache.activemq.util.TimeUtils; -import org.apache.activemq.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; /** * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a @@ -161,6 +92,8 @@ public class BrokerService implements Service public Set extraConnectors = new HashSet(); private File dataDirectoryFile; + private PolicyMap destinationPolicy; + static { InputStream in; @@ -337,7 +270,6 @@ public class BrokerService implements Service public void makeSureDestinationExists(ActiveMQDestination activemqDestination) throws Exception { - System.out.println(">>>> making sure dest exits: " + activemqDestination); ArtemisBrokerWrapper hqBroker = (ArtemisBrokerWrapper) this.broker; //it can be null if (activemqDestination == null) @@ -404,6 +336,7 @@ public class BrokerService implements Service public void setDestinationPolicy(PolicyMap policyMap) { + this.destinationPolicy = policyMap; } public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) @@ -607,7 +540,7 @@ public class BrokerService implements Service public PolicyMap getDestinationPolicy() { - return null; + return this.destinationPolicy; } public void setTransportConnectorURIs(String[] transportConnectorURIs) diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java index 822faed30a..ced78570b5 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java @@ -20,6 +20,7 @@ import java.net.URI; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; @@ -32,10 +33,14 @@ import org.apache.activemq.artemis.core.registry.JndiBindingRegistry; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl; import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; public class ArtemisBrokerWrapper extends ArtemisBrokerBase { @@ -62,14 +67,26 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase Configuration serverConfig = server.getConfiguration(); Set acceptors0 = serverConfig.getAcceptorConfigurations(); - Iterator iter0 = acceptors0.iterator(); - Map addressSettings = serverConfig.getAddressesSettings(); + Map addressSettingsMap = serverConfig.getAddressesSettings(); + + //do policy translation + PolicyMap policyMap = this.bservice.getDestinationPolicy(); + + if (policyMap != null) + { + translatePolicyMap(serverConfig, policyMap); + } + String match = "jms.queue.#"; - AddressSettings dlaSettings = new AddressSettings(); + AddressSettings commonSettings = addressSettingsMap.get(match); + if (commonSettings == null) + { + commonSettings = new AddressSettings(); + addressSettingsMap.put(match, commonSettings); + } SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ"); - dlaSettings.setDeadLetterAddress(dla); - addressSettings.put(match, dlaSettings); + commonSettings.setDeadLetterAddress(dla); serverConfig.getAcceptorConfigurations().add(transportConfiguration); if (this.bservice.enableSsl()) @@ -177,6 +194,47 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase } + private void translatePolicyMap(Configuration serverConfig, PolicyMap policyMap) + { + List allEntries = policyMap.getAllEntries(); + for (Object o : allEntries) + { + PolicyEntry entry = (PolicyEntry)o; + org.apache.activemq.command.ActiveMQDestination targetDest = entry.getDestination(); + String match = getCorePattern(targetDest); + Map settingsMap = serverConfig.getAddressesSettings(); + AddressSettings settings = settingsMap.get(match); + if (settings == null) + { + settings = new AddressSettings(); + settingsMap.put(match, settings); + } + + if (entry.isAdvisoryForSlowConsumers()) + { + settings.setSlowConsumerThreshold(1000); + settings.setSlowConsumerCheckPeriod(1); + settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY); + } + } + } + + private String getCorePattern(org.apache.activemq.command.ActiveMQDestination dest) + { + String physicalName = dest.getPhysicalName(); + String pattern = physicalName.replace(">", "#"); + if (dest.isTopic()) + { + pattern = "jms.topic." + pattern; + } + else + { + pattern = "jms.queue." + pattern; + } + + return pattern; + } + @Override public void stop() throws Exception { diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/PolicyMap.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/PolicyMap.java new file mode 100644 index 0000000000..e48f2d906c --- /dev/null +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/PolicyMap.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.policy; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.filter.DestinationMap; +import org.apache.activemq.filter.DestinationMapEntry; + +/** + * Represents a destination based configuration of policies so that individual + * destinations or wildcard hierarchies of destinations can be configured using + * different policies. + * + * @org.apache.xbean.XBean + */ +public class PolicyMap extends DestinationMap +{ + + private PolicyEntry defaultEntry; + private List allEntries = new ArrayList(); + + public PolicyEntry getEntryFor(ActiveMQDestination destination) + { + PolicyEntry answer = (PolicyEntry) chooseValue(destination); + if (answer == null) + { + answer = getDefaultEntry(); + } + return answer; + } + + /** + * Sets the individual entries on the policy map + * + * @org.apache.xbean.ElementType class="org.apache.activemq.broker.region.policy.PolicyEntry" + */ + public void setPolicyEntries(List entries) + { + super.setEntries(entries); + allEntries.addAll(entries); + } + + public List getAllEntries() + { + return allEntries; + } + + public PolicyEntry getDefaultEntry() + { + return defaultEntry; + } + + public void setDefaultEntry(PolicyEntry defaultEntry) + { + this.defaultEntry = defaultEntry; + } + + protected Class getEntryClass() + { + return PolicyEntry.class; + } +}