ARTEMIS-149 Advisory Message Support
Adding functions to send advisory messages.
This commit is contained in:
parent
17cc62bca2
commit
7cf58b1e88
|
@ -34,6 +34,12 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
|
||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.core.server.management.NotificationListener;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
|
@ -84,7 +90,7 @@ import org.apache.activemq.util.IdGenerator;
|
|||
import org.apache.activemq.util.InetAddressUtil;
|
||||
import org.apache.activemq.util.LongSequenceGenerator;
|
||||
|
||||
public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
|
||||
public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, NotificationListener
|
||||
{
|
||||
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
|
||||
private static final IdGenerator ID_GENERATOR = new IdGenerator();
|
||||
|
@ -121,6 +127,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
|
|||
|
||||
private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<TransactionId, AMQSession>();
|
||||
|
||||
private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<String, SessionId>();
|
||||
|
||||
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server)
|
||||
{
|
||||
this.factory = factory;
|
||||
|
@ -130,6 +138,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
|
|||
wireFactory.setCacheEnabled(false);
|
||||
brokerState = new BrokerState();
|
||||
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
|
||||
ManagementService service = server.getManagementService();
|
||||
if (service != null)
|
||||
{
|
||||
service.addNotificationListener(this);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -603,6 +616,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
|
|||
amqSession.initialize();
|
||||
amqSession.setInternal(internal);
|
||||
sessions.put(ss.getSessionId(), amqSession);
|
||||
sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
|
||||
return amqSession;
|
||||
}
|
||||
|
||||
|
@ -783,4 +797,51 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
|
|||
{
|
||||
transactions.put(txId, amqSession);
|
||||
}
|
||||
|
||||
//advisory support
|
||||
@Override
|
||||
public void onNotification(Notification notif)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (notif.getType() instanceof CoreNotificationType)
|
||||
{
|
||||
CoreNotificationType type = (CoreNotificationType)notif.getType();
|
||||
switch (type)
|
||||
{
|
||||
case CONSUMER_SLOW:
|
||||
fireSlowConsumer(notif);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
ActiveMQServerLogger.LOGGER.error("Failed to send notification " + notif, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void fireSlowConsumer(Notification notif) throws Exception
|
||||
{
|
||||
SimpleString coreSessionId = notif.getProperties().getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME);
|
||||
Long coreConsumerId = notif.getProperties().getLongProperty(ManagementHelper.HDR_CONSUMER_NAME);
|
||||
SessionId sessionId = sessionIdMap.get(coreSessionId.toString());
|
||||
AMQSession session = sessions.get(sessionId);
|
||||
AMQConsumer consumer = session.getConsumer(coreConsumerId);
|
||||
ActiveMQDestination destination = consumer.getDestination();
|
||||
|
||||
if (!AdvisorySupport.isAdvisoryTopic(destination))
|
||||
{
|
||||
ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
|
||||
ConnectionId connId = sessionId.getParentId();
|
||||
AMQTransportConnectionState cc = (AMQTransportConnectionState)this.brokerConnectionStates.get(connId);
|
||||
OpenWireConnection conn = cc.getConnection();
|
||||
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
|
||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString());
|
||||
|
||||
fireAdvisory(conn.getConext(), topic, advisoryMessage, consumer.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -390,4 +390,14 @@ public class AMQConsumer implements BrowserListener
|
|||
session.getCoreSession().acknowledge(nativeId, lastMi.nativeId);
|
||||
}
|
||||
}
|
||||
|
||||
public org.apache.activemq.command.ActiveMQDestination getDestination()
|
||||
{
|
||||
return actualDest;
|
||||
}
|
||||
|
||||
public ConsumerInfo getInfo()
|
||||
{
|
||||
return info;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -506,6 +506,11 @@ public class AMQSession implements SessionCallback
|
|||
this.coreSession.close(false);
|
||||
}
|
||||
|
||||
public AMQConsumer getConsumer(Long coreConsumerId)
|
||||
{
|
||||
return consumers.get(coreConsumerId);
|
||||
}
|
||||
|
||||
private class SendRetryTask implements Runnable
|
||||
{
|
||||
private ServerMessage coreMsg;
|
||||
|
|
|
@ -22,110 +22,41 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.Provider;
|
||||
import java.security.Security;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionMetaData;
|
||||
import org.apache.activemq.ConfigurationException;
|
||||
import org.apache.activemq.Service;
|
||||
import org.apache.activemq.advisory.AdvisoryBroker;
|
||||
import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper;
|
||||
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
|
||||
import org.apache.activemq.broker.jmx.AnnotatedMBean;
|
||||
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
|
||||
import org.apache.activemq.broker.jmx.BrokerView;
|
||||
import org.apache.activemq.broker.jmx.ConnectorView;
|
||||
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
|
||||
import org.apache.activemq.broker.jmx.HealthView;
|
||||
import org.apache.activemq.broker.jmx.HealthViewMBean;
|
||||
import org.apache.activemq.broker.jmx.JmsConnectorView;
|
||||
import org.apache.activemq.broker.jmx.JobSchedulerView;
|
||||
import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
|
||||
import org.apache.activemq.broker.jmx.Log4JConfigView;
|
||||
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
|
||||
import org.apache.activemq.broker.jmx.ManagementContext;
|
||||
import org.apache.activemq.broker.jmx.NetworkConnectorView;
|
||||
import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
|
||||
import org.apache.activemq.broker.jmx.ProxyConnectorView;
|
||||
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.DestinationFactory;
|
||||
import org.apache.activemq.broker.region.DestinationFactoryImpl;
|
||||
import org.apache.activemq.broker.region.DestinationInterceptor;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.virtual.MirroredQueue;
|
||||
import org.apache.activemq.broker.region.virtual.VirtualDestination;
|
||||
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
|
||||
import org.apache.activemq.broker.region.virtual.VirtualTopic;
|
||||
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
|
||||
import org.apache.activemq.broker.scheduler.SchedulerBroker;
|
||||
import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.BrokerId;
|
||||
import org.apache.activemq.command.ProducerInfo;
|
||||
import org.apache.activemq.filter.DestinationFilter;
|
||||
import org.apache.activemq.network.ConnectionFilter;
|
||||
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
||||
import org.apache.activemq.network.NetworkConnector;
|
||||
import org.apache.activemq.network.jms.JmsConnector;
|
||||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.apache.activemq.proxy.ProxyConnector;
|
||||
import org.apache.activemq.security.MessageAuthorizationPolicy;
|
||||
import org.apache.activemq.selector.SelectorParser;
|
||||
import org.apache.activemq.store.JournaledStore;
|
||||
import org.apache.activemq.store.PListStore;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.PersistenceAdapterFactory;
|
||||
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
|
||||
import org.apache.activemq.thread.Scheduler;
|
||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||
import org.apache.activemq.transport.TransportFactorySupport;
|
||||
import org.apache.activemq.transport.TransportServer;
|
||||
import org.apache.activemq.transport.vm.VMTransportFactory;
|
||||
import org.apache.activemq.usage.SystemUsage;
|
||||
import org.apache.activemq.util.BrokerSupport;
|
||||
import org.apache.activemq.util.DefaultIOExceptionHandler;
|
||||
import org.apache.activemq.util.IOExceptionHandler;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.activemq.util.InetAddressUtil;
|
||||
import org.apache.activemq.util.ServiceStopper;
|
||||
import org.apache.activemq.util.ThreadPoolUtils;
|
||||
import org.apache.activemq.util.TimeUtils;
|
||||
import org.apache.activemq.util.URISupport;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.MDC;
|
||||
|
||||
/**
|
||||
* Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a
|
||||
|
@ -161,6 +92,8 @@ public class BrokerService implements Service
|
|||
public Set<Integer> extraConnectors = new HashSet<Integer>();
|
||||
private File dataDirectoryFile;
|
||||
|
||||
private PolicyMap destinationPolicy;
|
||||
|
||||
static
|
||||
{
|
||||
InputStream in;
|
||||
|
@ -337,7 +270,6 @@ public class BrokerService implements Service
|
|||
|
||||
public void makeSureDestinationExists(ActiveMQDestination activemqDestination) throws Exception
|
||||
{
|
||||
System.out.println(">>>> making sure dest exits: " + activemqDestination);
|
||||
ArtemisBrokerWrapper hqBroker = (ArtemisBrokerWrapper) this.broker;
|
||||
//it can be null
|
||||
if (activemqDestination == null)
|
||||
|
@ -404,6 +336,7 @@ public class BrokerService implements Service
|
|||
|
||||
public void setDestinationPolicy(PolicyMap policyMap)
|
||||
{
|
||||
this.destinationPolicy = policyMap;
|
||||
}
|
||||
|
||||
public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup)
|
||||
|
@ -607,7 +540,7 @@ public class BrokerService implements Service
|
|||
|
||||
public PolicyMap getDestinationPolicy()
|
||||
{
|
||||
return null;
|
||||
return this.destinationPolicy;
|
||||
}
|
||||
|
||||
public void setTransportConnectorURIs(String[] transportConnectorURIs)
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.net.URI;
|
|||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -32,10 +33,14 @@ import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
|
|||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.security.Role;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
|
||||
import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
|
||||
public class ArtemisBrokerWrapper extends ArtemisBrokerBase
|
||||
{
|
||||
|
@ -62,14 +67,26 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
|
|||
Configuration serverConfig = server.getConfiguration();
|
||||
|
||||
Set<TransportConfiguration> acceptors0 = serverConfig.getAcceptorConfigurations();
|
||||
Iterator<TransportConfiguration> iter0 = acceptors0.iterator();
|
||||
|
||||
Map<String, AddressSettings> addressSettings = serverConfig.getAddressesSettings();
|
||||
Map<String, AddressSettings> addressSettingsMap = serverConfig.getAddressesSettings();
|
||||
|
||||
//do policy translation
|
||||
PolicyMap policyMap = this.bservice.getDestinationPolicy();
|
||||
|
||||
if (policyMap != null)
|
||||
{
|
||||
translatePolicyMap(serverConfig, policyMap);
|
||||
}
|
||||
|
||||
String match = "jms.queue.#";
|
||||
AddressSettings dlaSettings = new AddressSettings();
|
||||
AddressSettings commonSettings = addressSettingsMap.get(match);
|
||||
if (commonSettings == null)
|
||||
{
|
||||
commonSettings = new AddressSettings();
|
||||
addressSettingsMap.put(match, commonSettings);
|
||||
}
|
||||
SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ");
|
||||
dlaSettings.setDeadLetterAddress(dla);
|
||||
addressSettings.put(match, dlaSettings);
|
||||
commonSettings.setDeadLetterAddress(dla);
|
||||
|
||||
serverConfig.getAcceptorConfigurations().add(transportConfiguration);
|
||||
if (this.bservice.enableSsl())
|
||||
|
@ -177,6 +194,47 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
|
|||
|
||||
}
|
||||
|
||||
private void translatePolicyMap(Configuration serverConfig, PolicyMap policyMap)
|
||||
{
|
||||
List allEntries = policyMap.getAllEntries();
|
||||
for (Object o : allEntries)
|
||||
{
|
||||
PolicyEntry entry = (PolicyEntry)o;
|
||||
org.apache.activemq.command.ActiveMQDestination targetDest = entry.getDestination();
|
||||
String match = getCorePattern(targetDest);
|
||||
Map<String, AddressSettings> settingsMap = serverConfig.getAddressesSettings();
|
||||
AddressSettings settings = settingsMap.get(match);
|
||||
if (settings == null)
|
||||
{
|
||||
settings = new AddressSettings();
|
||||
settingsMap.put(match, settings);
|
||||
}
|
||||
|
||||
if (entry.isAdvisoryForSlowConsumers())
|
||||
{
|
||||
settings.setSlowConsumerThreshold(1000);
|
||||
settings.setSlowConsumerCheckPeriod(1);
|
||||
settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String getCorePattern(org.apache.activemq.command.ActiveMQDestination dest)
|
||||
{
|
||||
String physicalName = dest.getPhysicalName();
|
||||
String pattern = physicalName.replace(">", "#");
|
||||
if (dest.isTopic())
|
||||
{
|
||||
pattern = "jms.topic." + pattern;
|
||||
}
|
||||
else
|
||||
{
|
||||
pattern = "jms.queue." + pattern;
|
||||
}
|
||||
|
||||
return pattern;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception
|
||||
{
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.region.policy;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.filter.DestinationMap;
|
||||
import org.apache.activemq.filter.DestinationMapEntry;
|
||||
|
||||
/**
|
||||
* Represents a destination based configuration of policies so that individual
|
||||
* destinations or wildcard hierarchies of destinations can be configured using
|
||||
* different policies.
|
||||
*
|
||||
* @org.apache.xbean.XBean
|
||||
*/
|
||||
public class PolicyMap extends DestinationMap
|
||||
{
|
||||
|
||||
private PolicyEntry defaultEntry;
|
||||
private List allEntries = new ArrayList();
|
||||
|
||||
public PolicyEntry getEntryFor(ActiveMQDestination destination)
|
||||
{
|
||||
PolicyEntry answer = (PolicyEntry) chooseValue(destination);
|
||||
if (answer == null)
|
||||
{
|
||||
answer = getDefaultEntry();
|
||||
}
|
||||
return answer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the individual entries on the policy map
|
||||
*
|
||||
* @org.apache.xbean.ElementType class="org.apache.activemq.broker.region.policy.PolicyEntry"
|
||||
*/
|
||||
public void setPolicyEntries(List entries)
|
||||
{
|
||||
super.setEntries(entries);
|
||||
allEntries.addAll(entries);
|
||||
}
|
||||
|
||||
public List getAllEntries()
|
||||
{
|
||||
return allEntries;
|
||||
}
|
||||
|
||||
public PolicyEntry getDefaultEntry()
|
||||
{
|
||||
return defaultEntry;
|
||||
}
|
||||
|
||||
public void setDefaultEntry(PolicyEntry defaultEntry)
|
||||
{
|
||||
this.defaultEntry = defaultEntry;
|
||||
}
|
||||
|
||||
protected Class<? extends DestinationMapEntry> getEntryClass()
|
||||
{
|
||||
return PolicyEntry.class;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue