This closes #64 Open Wire Advisory Support

This commit is contained in:
Clebert Suconic 2015-07-09 10:23:25 -04:00
commit 2a4e9f191a
6 changed files with 223 additions and 77 deletions

View File

@ -34,6 +34,12 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
@ -84,7 +90,7 @@ import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.LongSequenceGenerator;
public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, NotificationListener
{
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
private static final IdGenerator ID_GENERATOR = new IdGenerator();
@ -121,6 +127,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<TransactionId, AMQSession>();
private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<String, SessionId>();
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server)
{
this.factory = factory;
@ -130,6 +138,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
wireFactory.setCacheEnabled(false);
brokerState = new BrokerState();
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
ManagementService service = server.getManagementService();
if (service != null)
{
service.addNotificationListener(this);
}
}
@ -603,6 +616,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
amqSession.initialize();
amqSession.setInternal(internal);
sessions.put(ss.getSessionId(), amqSession);
sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
return amqSession;
}
@ -783,4 +797,51 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
{
transactions.put(txId, amqSession);
}
//advisory support
@Override
public void onNotification(Notification notif)
{
try
{
if (notif.getType() instanceof CoreNotificationType)
{
CoreNotificationType type = (CoreNotificationType)notif.getType();
switch (type)
{
case CONSUMER_SLOW:
fireSlowConsumer(notif);
break;
default:
break;
}
}
}
catch (Exception e)
{
ActiveMQServerLogger.LOGGER.error("Failed to send notification " + notif, e);
}
}
private void fireSlowConsumer(Notification notif) throws Exception
{
SimpleString coreSessionId = notif.getProperties().getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME);
Long coreConsumerId = notif.getProperties().getLongProperty(ManagementHelper.HDR_CONSUMER_NAME);
SessionId sessionId = sessionIdMap.get(coreSessionId.toString());
AMQSession session = sessions.get(sessionId);
AMQConsumer consumer = session.getConsumer(coreConsumerId);
ActiveMQDestination destination = consumer.getDestination();
if (!AdvisorySupport.isAdvisoryTopic(destination))
{
ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
ConnectionId connId = sessionId.getParentId();
AMQTransportConnectionState cc = (AMQTransportConnectionState)this.brokerConnectionStates.get(connId);
OpenWireConnection conn = cc.getConnection();
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString());
fireAdvisory(conn.getConext(), topic, advisoryMessage, consumer.getId());
}
}
}

View File

@ -390,4 +390,14 @@ public class AMQConsumer implements BrowserListener
session.getCoreSession().acknowledge(nativeId, lastMi.nativeId);
}
}
public org.apache.activemq.command.ActiveMQDestination getDestination()
{
return actualDest;
}
public ConsumerInfo getInfo()
{
return info;
}
}

View File

@ -506,6 +506,11 @@ public class AMQSession implements SessionCallback
this.coreSession.close(false);
}
public AMQConsumer getConsumer(Long coreConsumerId)
{
return consumers.get(coreConsumerId);
}
private class SendRetryTask implements Runnable
{
private ServerMessage coreMsg;

View File

@ -22,110 +22,41 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.Provider;
import java.security.Security;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.ConnectorView;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.HealthView;
import org.apache.activemq.broker.jmx.HealthViewMBean;
import org.apache.activemq.broker.jmx.JmsConnectorView;
import org.apache.activemq.broker.jmx.JobSchedulerView;
import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
import org.apache.activemq.broker.jmx.Log4JConfigView;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.NetworkConnectorView;
import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
import org.apache.activemq.broker.jmx.ProxyConnectorView;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationFactoryImpl;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.MirroredQueue;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.broker.scheduler.SchedulerBroker;
import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.network.ConnectionFilter;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.network.jms.JmsConnector;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.proxy.ProxyConnector;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.JournaledStore;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterFactory;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.TransportFactorySupport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.activemq.util.IOExceptionHandler;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.util.TimeUtils;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
/**
* Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a
@ -161,6 +92,8 @@ public class BrokerService implements Service
public Set<Integer> extraConnectors = new HashSet<Integer>();
private File dataDirectoryFile;
private PolicyMap destinationPolicy;
static
{
InputStream in;
@ -337,7 +270,6 @@ public class BrokerService implements Service
public void makeSureDestinationExists(ActiveMQDestination activemqDestination) throws Exception
{
System.out.println(">>>> making sure dest exits: " + activemqDestination);
ArtemisBrokerWrapper hqBroker = (ArtemisBrokerWrapper) this.broker;
//it can be null
if (activemqDestination == null)
@ -404,6 +336,7 @@ public class BrokerService implements Service
public void setDestinationPolicy(PolicyMap policyMap)
{
this.destinationPolicy = policyMap;
}
public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup)
@ -607,7 +540,7 @@ public class BrokerService implements Service
public PolicyMap getDestinationPolicy()
{
return null;
return this.destinationPolicy;
}
public void setTransportConnectorURIs(String[] transportConnectorURIs)

View File

@ -20,6 +20,7 @@ import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -32,10 +33,14 @@ import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
public class ArtemisBrokerWrapper extends ArtemisBrokerBase
{
@ -62,14 +67,26 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
Configuration serverConfig = server.getConfiguration();
Set<TransportConfiguration> acceptors0 = serverConfig.getAcceptorConfigurations();
Iterator<TransportConfiguration> iter0 = acceptors0.iterator();
Map<String, AddressSettings> addressSettings = serverConfig.getAddressesSettings();
Map<String, AddressSettings> addressSettingsMap = serverConfig.getAddressesSettings();
//do policy translation
PolicyMap policyMap = this.bservice.getDestinationPolicy();
if (policyMap != null)
{
translatePolicyMap(serverConfig, policyMap);
}
String match = "jms.queue.#";
AddressSettings dlaSettings = new AddressSettings();
AddressSettings commonSettings = addressSettingsMap.get(match);
if (commonSettings == null)
{
commonSettings = new AddressSettings();
addressSettingsMap.put(match, commonSettings);
}
SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ");
dlaSettings.setDeadLetterAddress(dla);
addressSettings.put(match, dlaSettings);
commonSettings.setDeadLetterAddress(dla);
serverConfig.getAcceptorConfigurations().add(transportConfiguration);
if (this.bservice.enableSsl())
@ -177,6 +194,47 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
}
private void translatePolicyMap(Configuration serverConfig, PolicyMap policyMap)
{
List allEntries = policyMap.getAllEntries();
for (Object o : allEntries)
{
PolicyEntry entry = (PolicyEntry)o;
org.apache.activemq.command.ActiveMQDestination targetDest = entry.getDestination();
String match = getCorePattern(targetDest);
Map<String, AddressSettings> settingsMap = serverConfig.getAddressesSettings();
AddressSettings settings = settingsMap.get(match);
if (settings == null)
{
settings = new AddressSettings();
settingsMap.put(match, settings);
}
if (entry.isAdvisoryForSlowConsumers())
{
settings.setSlowConsumerThreshold(1000);
settings.setSlowConsumerCheckPeriod(1);
settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
}
}
}
private String getCorePattern(org.apache.activemq.command.ActiveMQDestination dest)
{
String physicalName = dest.getPhysicalName();
String pattern = physicalName.replace(">", "#");
if (dest.isTopic())
{
pattern = "jms.topic." + pattern;
}
else
{
pattern = "jms.queue." + pattern;
}
return pattern;
}
@Override
public void stop() throws Exception
{

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.filter.DestinationMapEntry;
/**
* Represents a destination based configuration of policies so that individual
* destinations or wildcard hierarchies of destinations can be configured using
* different policies.
*
* @org.apache.xbean.XBean
*/
public class PolicyMap extends DestinationMap
{
private PolicyEntry defaultEntry;
private List allEntries = new ArrayList();
public PolicyEntry getEntryFor(ActiveMQDestination destination)
{
PolicyEntry answer = (PolicyEntry) chooseValue(destination);
if (answer == null)
{
answer = getDefaultEntry();
}
return answer;
}
/**
* Sets the individual entries on the policy map
*
* @org.apache.xbean.ElementType class="org.apache.activemq.broker.region.policy.PolicyEntry"
*/
public void setPolicyEntries(List entries)
{
super.setEntries(entries);
allEntries.addAll(entries);
}
public List getAllEntries()
{
return allEntries;
}
public PolicyEntry getDefaultEntry()
{
return defaultEntry;
}
public void setDefaultEntry(PolicyEntry defaultEntry)
{
this.defaultEntry = defaultEntry;
}
protected Class<? extends DestinationMapEntry> getEntryClass()
{
return PolicyEntry.class;
}
}