From 204f91f9355130f8b6d6edd9d9d701e30cff5ea8 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 3 Feb 2010 08:02:49 +0000 Subject: [PATCH] Fix for https://issues.apache.org/activemq/browse/AMQ-2571 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@905926 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQConnection.java | 25 ++- .../activemq/advisory/AdvisoryBroker.java | 23 ++- .../activemq/broker/BrokerBroadcaster.java | 28 ++- .../apache/activemq/broker/BrokerFilter.java | 4 +- .../apache/activemq/broker/BrokerService.java | 6 +- .../apache/activemq/broker/EmptyBroker.java | 2 +- .../apache/activemq/broker/ErrorBroker.java | 2 +- .../activemq/broker/MutableBrokerFilter.java | 4 +- .../activemq/broker/jmx/BrokerView.java | 6 +- .../broker/region/AbstractRegion.java | 162 ++++++++++-------- .../apache/activemq/broker/region/Region.java | 4 +- .../activemq/broker/region/RegionBroker.java | 16 +- .../activemq/broker/region/TopicRegion.java | 10 +- .../broker/util/LoggingBrokerPlugin.java | 48 +++++- .../view/DestinationDotFileInterceptor.java | 8 +- .../security/AuthorizationBroker.java | 25 +-- .../region/DestinationRemoveRestartTest.java | 5 +- .../org/apache/activemq/bugs/AMQ2571Test.java | 4 +- .../apache/activemq/perf/SimpleQueueTest.java | 2 +- .../activemq/xbean/XBeanConfigTest.java | 8 +- 20 files changed, 246 insertions(+), 146 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 003a92e317..6325b9d66a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -33,7 +33,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - import javax.jms.Connection; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; @@ -41,6 +40,7 @@ import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueConnection; @@ -51,8 +51,7 @@ import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.jms.XAConnection; -import javax.jms.InvalidDestinationException; - +import org.apache.activemq.advisory.DestinationSource; import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -97,7 +96,6 @@ import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.ServiceSupport; -import org.apache.activemq.advisory.DestinationSource; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -182,9 +180,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon // Assume that protocol is the latest. Change to the actual protocol // version when a WireFormatInfo is received. - private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); - private long timeCreated; - private ConnectionAudit connectionAudit = new ConnectionAudit(); + private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); + private final long timeCreated; + private final ConnectionAudit connectionAudit = new ConnectionAudit(); private DestinationSource destinationSource; private final Object ensureConnectionInfoSentMutex = new Object(); private boolean useDedicatedTaskRunner; @@ -1906,12 +1904,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon activeTempDestinations.remove(destination); - DestinationInfo info = new DestinationInfo(); - info.setConnectionId(this.info.getConnectionId()); - info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); - info.setDestination(destination); - info.setTimeout(0); - syncSendPacket(info); + DestinationInfo destInfo = new DestinationInfo(); + destInfo.setConnectionId(this.info.getConnectionId()); + destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); + destInfo.setDestination(destination); + destInfo.setTimeout(0); + syncSendPacket(destInfo); } public boolean isDeleted(ActiveMQDestination dest) { @@ -2199,6 +2197,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon this.copyMessageOnSend = copyMessageOnSend; } + @Override public String toString() { return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}"; } diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index ef26e54309..758a81e5d7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -20,7 +20,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.ConnectionContext; @@ -73,6 +72,7 @@ public class AdvisoryBroker extends BrokerFilter { advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); } + @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { super.addConnection(context, info); @@ -85,6 +85,7 @@ public class AdvisoryBroker extends BrokerFilter { connections.put(copy.getConnectionId(), copy); } + @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { Subscription answer = super.addConsumer(context, info); @@ -138,6 +139,7 @@ public class AdvisoryBroker extends BrokerFilter { return answer; } + @Override public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { super.addProducer(context, info); @@ -149,8 +151,9 @@ public class AdvisoryBroker extends BrokerFilter { } } - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - Destination answer = super.addDestination(context, destination); + @Override + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception { + Destination answer = super.addDestination(context, destination,create); if (!AdvisorySupport.isAdvisoryTopic(destination)) { DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); DestinationInfo previous = destinations.putIfAbsent(destination, info); @@ -162,6 +165,7 @@ public class AdvisoryBroker extends BrokerFilter { return answer; } + @Override public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { ActiveMQDestination destination = info.getDestination(); next.addDestinationInfo(context, info); @@ -175,6 +179,7 @@ public class AdvisoryBroker extends BrokerFilter { } } + @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { super.removeDestination(context, destination, timeout); DestinationInfo info = destinations.remove(destination); @@ -195,6 +200,7 @@ public class AdvisoryBroker extends BrokerFilter { } + @Override public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception { super.removeDestinationInfo(context, destInfo); DestinationInfo info = destinations.remove(destInfo.getDestination()); @@ -216,6 +222,7 @@ public class AdvisoryBroker extends BrokerFilter { } + @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { super.removeConnection(context, info, error); @@ -224,6 +231,7 @@ public class AdvisoryBroker extends BrokerFilter { connections.remove(info.getConnectionId()); } + @Override public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { super.removeConsumer(context, info); @@ -238,6 +246,7 @@ public class AdvisoryBroker extends BrokerFilter { } } + @Override public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { super.removeProducer(context, info); @@ -252,6 +261,7 @@ public class AdvisoryBroker extends BrokerFilter { } } + @Override public void messageExpired(ConnectionContext context, MessageReference messageReference) { super.messageExpired(context, messageReference); try { @@ -268,6 +278,7 @@ public class AdvisoryBroker extends BrokerFilter { } } + @Override public void messageConsumed(ConnectionContext context, MessageReference messageReference) { super.messageConsumed(context, messageReference); try { @@ -282,6 +293,7 @@ public class AdvisoryBroker extends BrokerFilter { } } + @Override public void messageDelivered(ConnectionContext context, MessageReference messageReference) { super.messageDelivered(context, messageReference); try { @@ -296,6 +308,7 @@ public class AdvisoryBroker extends BrokerFilter { } } + @Override public void messageDiscarded(ConnectionContext context, MessageReference messageReference) { super.messageDiscarded(context, messageReference); try { @@ -310,6 +323,7 @@ public class AdvisoryBroker extends BrokerFilter { } } + @Override public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { super.slowConsumer(context, destination,subs); try { @@ -322,6 +336,7 @@ public class AdvisoryBroker extends BrokerFilter { } } + @Override public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { super.fastProducer(context, producerInfo); try { @@ -334,6 +349,7 @@ public class AdvisoryBroker extends BrokerFilter { } } + @Override public void isFull(ConnectionContext context,Destination destination,Usage usage) { super.isFull(context,destination, usage); try { @@ -346,6 +362,7 @@ public class AdvisoryBroker extends BrokerFilter { } } + @Override public void nowMasterBroker() { super.nowMasterBroker(); try { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java index 540148b35b..b16cfc2a07 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java @@ -43,6 +43,7 @@ public class BrokerBroadcaster extends BrokerFilter { super(next); } + @Override public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { next.acknowledge(consumerExchange, ack); Broker brokers[] = getListeners(); @@ -51,6 +52,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { next.addConnection(context, info); Broker brokers[] = getListeners(); @@ -59,6 +61,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { Subscription answer = next.addConsumer(context, info); Broker brokers[] = getListeners(); @@ -68,6 +71,7 @@ public class BrokerBroadcaster extends BrokerFilter { return answer; } + @Override public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { next.addProducer(context, info); Broker brokers[] = getListeners(); @@ -76,6 +80,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { next.commitTransaction(context, xid, onePhase); Broker brokers[] = getListeners(); @@ -84,6 +89,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { next.removeSubscription(context, info); Broker brokers[] = getListeners(); @@ -92,6 +98,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { int result = next.prepareTransaction(context, xid); Broker brokers[] = getListeners(); @@ -102,6 +109,7 @@ public class BrokerBroadcaster extends BrokerFilter { return result; } + @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { next.removeConnection(context, info, error); Broker brokers[] = getListeners(); @@ -110,6 +118,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { next.removeConsumer(context, info); Broker brokers[] = getListeners(); @@ -118,6 +127,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { next.removeProducer(context, info); Broker brokers[] = getListeners(); @@ -126,6 +136,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { next.rollbackTransaction(context, xid); Broker brokers[] = getListeners(); @@ -134,6 +145,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { next.send(producerExchange, messageSend); Broker brokers[] = getListeners(); @@ -142,6 +154,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { next.beginTransaction(context, xid); Broker brokers[] = getListeners(); @@ -150,6 +163,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { next.forgetTransaction(context, transactionId); Broker brokers[] = getListeners(); @@ -158,15 +172,17 @@ public class BrokerBroadcaster extends BrokerFilter { } } - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - Destination result = next.addDestination(context, destination); + @Override + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception { + Destination result = next.addDestination(context, destination,createIfTemporary); Broker brokers[] = getListeners(); for (int i = 0; i < brokers.length; i++) { - brokers[i].addDestination(context, destination); + brokers[i].addDestination(context, destination,createIfTemporary); } return result; } + @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { next.removeDestination(context, destination, timeout); Broker brokers[] = getListeners(); @@ -175,6 +191,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void start() throws Exception { next.start(); Broker brokers[] = getListeners(); @@ -183,6 +200,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void stop() throws Exception { next.stop(); Broker brokers[] = getListeners(); @@ -191,6 +209,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void addSession(ConnectionContext context, SessionInfo info) throws Exception { next.addSession(context, info); Broker brokers[] = getListeners(); @@ -199,6 +218,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { next.removeSession(context, info); Broker brokers[] = getListeners(); @@ -207,6 +227,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void gc() { next.gc(); Broker brokers[] = getListeners(); @@ -215,6 +236,7 @@ public class BrokerBroadcaster extends BrokerFilter { } } + @Override public void addBroker(Connection connection, BrokerInfo info) { next.addBroker(connection, info); Broker brokers[] = getListeners(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 06cf2eb479..941d3cafd5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -139,8 +139,8 @@ public class BrokerFilter implements Broker { return next.getClients(); } - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - return next.addDestination(context, destination); + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception { + return next.addDestination(context, destination,createIfTemporary); } public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index b3ec513805..cd93dd19d5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -1418,7 +1418,7 @@ public class BrokerService implements Service { * JMS name */ public Destination getDestination(ActiveMQDestination destination) throws Exception { - return getBroker().addDestination(getAdminConnectionContext(), destination); + return getBroker().addDestination(getAdminConnectionContext(), destination,false); } public void removeDestination(ActiveMQDestination destination) throws Exception { @@ -1886,7 +1886,7 @@ public class BrokerService implements Service { ConnectionContext adminConnectionContext = getAdminConnectionContext(); for (int i = 0; i < destinations.length; i++) { ActiveMQDestination destination = destinations[i]; - getBroker().addDestination(adminConnectionContext, destination); + getBroker().addDestination(adminConnectionContext, destination,true); } } } @@ -2054,7 +2054,7 @@ public class BrokerService implements Service { } while (iter.hasNext()) { ActiveMQDestination destination = (ActiveMQDestination) iter.next(); - broker.addDestination(adminConnectionContext, destination); + broker.addDestination(adminConnectionContext, destination,false); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 854303898a..44db511ba1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -134,7 +134,7 @@ public class EmptyBroker implements Broker { } - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean flag) throws Exception { return null; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index 3229d85269..5b2333e0cd 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -137,7 +137,7 @@ public class ErrorBroker implements Broker { throw new BrokerStoppedException(this.message); } - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean flag) throws Exception { throw new BrokerStoppedException(this.message); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index 7219a5fada..f1ce429963 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -145,8 +145,8 @@ public class MutableBrokerFilter implements Broker { return getNext().getClients(); } - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - return getNext().addDestination(context, destination); + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception { + return getNext().addDestination(context, destination,createIfTemporary); } public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java index 1c655ee2be..1f23bfab69 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java @@ -23,9 +23,7 @@ import java.lang.reflect.Method; import java.net.URI; import java.net.URL; import java.util.concurrent.atomic.AtomicInteger; - import javax.management.ObjectName; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; @@ -237,11 +235,11 @@ public class BrokerView implements BrokerViewMBean { } public void addTopic(String name) throws Exception { - broker.addDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQTopic(name)); + broker.addDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQTopic(name),true); } public void addQueue(String name) throws Exception { - broker.addDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name)); + broker.addDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name),true); } public void removeTopic(String name) throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 3dbed3a10e..2435a0f4a0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -67,8 +67,8 @@ public abstract class AbstractRegion implements Region { protected final Map consumerChangeMutexMap = new HashMap(); protected boolean started; - public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, - DestinationFactory destinationFactory) { + public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, + TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { if (broker == null) { throw new IllegalArgumentException("null broker"); } @@ -82,7 +82,7 @@ public abstract class AbstractRegion implements Region { this.destinationFactory = destinationFactory; } - public final void start() throws Exception { + public final void start() throws Exception { started = true; Set inactiveDests = getInactiveDestinations(); @@ -92,7 +92,7 @@ public abstract class AbstractRegion implements Region { ConnectionContext context = new ConnectionContext(); context.setBroker(broker.getBrokerService().getBroker()); context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); - context.getBroker().addDestination(context, dest); + context.getBroker().addDestination(context, dest, false); } synchronized (destinationsMutex) { for (Iterator i = destinations.values().iterator(); i.hasNext();) { @@ -113,21 +113,27 @@ public abstract class AbstractRegion implements Region { destinations.clear(); } - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, + boolean createIfTemporary) throws Exception { LOG.debug(broker.getBrokerName() + " adding destination: " + destination); synchronized (destinationsMutex) { Destination dest = destinations.get(destination); if (dest == null) { - dest = createDestination(context, destination); - // intercept if there is a valid interceptor defined - DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); - if (destinationInterceptor != null) { - dest = destinationInterceptor.intercept(dest); + if (destination.isTemporary() == false || createIfTemporary) { + dest = createDestination(context, destination); + // intercept if there is a valid interceptor defined + DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); + if (destinationInterceptor != null) { + dest = destinationInterceptor.intercept(dest); + } + dest.start(); + destinations.put(destination, dest); + destinationMap.put(destination, dest); + addSubscriptionsForDestination(context, dest); + } + if (dest == null) { + throw new JMSException("The destination " + destination + " does not exist."); } - dest.start(); - destinations.put(destination, dest); - destinationMap.put(destination, dest); - addSubscriptionsForDestination(context, dest); } return dest; } @@ -136,8 +142,9 @@ public abstract class AbstractRegion implements Region { public Map getSubscriptions() { return subscriptions; } - - protected List addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { + + protected List addSubscriptionsForDestination(ConnectionContext context, Destination dest) + throws Exception { List rc = new ArrayList(); // Add all consumers that are interested in the destination. @@ -152,7 +159,8 @@ public abstract class AbstractRegion implements Region { } - public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { + public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) + throws Exception { // No timeout.. then try to shut down right way, fails if there are // current subscribers. @@ -174,7 +182,7 @@ public abstract class AbstractRegion implements Region { } LOG.debug("Removing destination: " + destination); - + synchronized (destinationsMutex) { Destination dest = destinations.remove(destination); if (dest != null) { @@ -187,13 +195,13 @@ public abstract class AbstractRegion implements Region { } } destinationMap.removeAll(destination); - dispose(context,dest); + dispose(context, dest); DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); if (destinationInterceptor != null) { destinationInterceptor.remove(dest); } - } else { + } else { LOG.debug("Destination doesn't exist: " + dest); } } @@ -217,11 +225,12 @@ public abstract class AbstractRegion implements Region { } public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: " + info.getDestination()); + LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: " + + info.getDestination()); ActiveMQDestination destination = info.getDestination(); if (destination != null && !destination.isPattern() && !destination.isComposite()) { // lets auto-create the destination - lookup(context, destination); + lookup(context, destination,true); } Object addGuard; @@ -235,7 +244,8 @@ public abstract class AbstractRegion implements Region { synchronized (addGuard) { Subscription o = subscriptions.get(info.getConsumerId()); if (o != null) { - LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this."); + LOG + .warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this."); return o; } @@ -268,20 +278,20 @@ public abstract class AbstractRegion implements Region { // Add the subscription to all the matching queues. // But copy the matches first - to prevent deadlocks - ListaddList = new ArrayList(); - synchronized(destinationsMutex) { + List addList = new ArrayList(); + synchronized (destinationsMutex) { for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { - Destination dest = (Destination)iter.next(); + Destination dest = (Destination) iter.next(); addList.add(dest); } } - - for (Destination dest:addList) { + + for (Destination dest : addList) { dest.addSubscription(context, sub); } if (info.isBrowser()) { - ((QueueBrowserSubscription)sub).destinationsAdded(); + ((QueueBrowserSubscription) sub).destinationsAdded(); } return sub; @@ -309,24 +319,24 @@ public abstract class AbstractRegion implements Region { } public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: " + info.getDestination()); + LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: " + + info.getDestination()); Subscription sub = subscriptions.remove(info.getConsumerId()); - //The sub could be removed elsewhere - see ConnectionSplitBroker + // The sub could be removed elsewhere - see ConnectionSplitBroker if (sub != null) { // remove the subscription from all the matching queues. List removeList = new ArrayList(); synchronized (destinationsMutex) { - for (Iterator iter = destinationMap.get(info.getDestination()) - .iterator(); iter.hasNext();) { + for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { Destination dest = (Destination) iter.next(); removeList.add(dest); - + } } - for(Destination dest:removeList) { - dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId()); + for (Destination dest : removeList) { + dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId()); } destroySubscription(sub); @@ -348,7 +358,7 @@ public abstract class AbstractRegion implements Region { final ConnectionContext context = producerExchange.getConnectionContext(); if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) { - final Destination regionDestination = lookup(context, messageSend.getDestination()); + final Destination regionDestination = lookup(context, messageSend.getDestination(),false); producerExchange.setRegionDestination(regionDestination); } @@ -358,13 +368,11 @@ public abstract class AbstractRegion implements Region { public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { Subscription sub = consumerExchange.getSubscription(); if (sub == null) { - sub = subscriptions.get(ack.getConsumerId()); + sub = subscriptions.get(ack.getConsumerId()); if (sub == null) { if (!consumerExchange.getConnectionContext().isInRecoveryMode()) { - LOG.warn("Ack for non existent subscription, ack:" + ack); - throw new IllegalArgumentException( - "The subscription does not exist: " - + ack.getConsumerId()); + LOG.warn("Ack for non existent subscription, ack:" + ack); + throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId()); } else { return; } @@ -382,19 +390,19 @@ public abstract class AbstractRegion implements Region { return sub.pullMessage(context, pull); } - protected Destination lookup(ConnectionContext context, ActiveMQDestination destination) throws Exception { + protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception { Destination dest = null; synchronized (destinationsMutex) { dest = destinations.get(destination); } if (dest == null) { - if (autoCreateDestinations) { + if (isAutoCreateDestinations()) { // Try to auto create the destination... re-invoke broker // from the // top so that the proper security checks are performed. try { - context.getBroker().addDestination(context, destination); - dest = addDestination(context, destination); + context.getBroker().addDestination(context, destination, createTemporary); + dest = addDestination(context, destination, false); } catch (DestinationAlreadyExistsException e) { // if the destination already exists then lets ignore // this error @@ -417,18 +425,19 @@ public abstract class AbstractRegion implements Region { sub.processMessageDispatchNotification(messageDispatchNotification); } else { throw new JMSException("Slave broker out of sync with master - Subscription: " - + messageDispatchNotification.getConsumerId() - + " on " + messageDispatchNotification.getDestination() - + " does not exist for dispatch of message: " + + messageDispatchNotification.getConsumerId() + " on " + + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: " + messageDispatchNotification.getMessageId()); } } - + /* - * For a Queue/TempQueue, dispatch order is imperative to match acks, so the dispatch is deferred till - * the notification to ensure that the subscription chosen by the master is used. AMQ-2102 - */ - protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification) throws Exception { + * For a Queue/TempQueue, dispatch order is imperative to match acks, so the + * dispatch is deferred till the notification to ensure that the + * subscription chosen by the master is used. AMQ-2102 + */ + protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification) + throws Exception { Destination dest = null; synchronized (destinationsMutex) { dest = destinations.get(messageDispatchNotification.getDestination()); @@ -436,13 +445,10 @@ public abstract class AbstractRegion implements Region { if (dest != null) { dest.processDispatchNotification(messageDispatchNotification); } else { - throw new JMSException( - "Slave broker out of sync with master - Destination: " - + messageDispatchNotification.getDestination() - + " does not exist for consumer " - + messageDispatchNotification.getConsumerId() - + " with message: " - + messageDispatchNotification.getMessageId()); + throw new JMSException("Slave broker out of sync with master - Destination: " + + messageDispatchNotification.getDestination() + " does not exist for consumer " + + messageDispatchNotification.getConsumerId() + " with message: " + + messageDispatchNotification.getMessageId()); } } @@ -461,7 +467,8 @@ public abstract class AbstractRegion implements Region { protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception; - protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { + protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) + throws Exception { return destinationFactory.createDestination(context, destination, destinationStatistics); } @@ -472,8 +479,8 @@ public abstract class AbstractRegion implements Region { public void setAutoCreateDestinations(boolean autoCreateDestinations) { this.autoCreateDestinations = autoCreateDestinations; } - - public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{ + + public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { synchronized (destinationsMutex) { for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { Destination dest = (Destination) iter.next(); @@ -484,34 +491,37 @@ public abstract class AbstractRegion implements Region { /** * Removes a Producer. - * @param context the environment the operation is being executed under. - * @throws Exception TODO + * + * @param context + * the environment the operation is being executed under. + * @throws Exception + * TODO */ - public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{ + public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { synchronized (destinationsMutex) { for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { - Destination dest = (Destination)iter.next(); + Destination dest = (Destination) iter.next(); dest.removeProducer(context, info); } } } - - protected void dispose(ConnectionContext context,Destination dest) throws Exception { + + protected void dispose(ConnectionContext context, Destination dest) throws Exception { dest.dispose(context); dest.stop(); destinationFactory.removeDestination(dest); } - - public void processConsumerControl(ConsumerBrokerExchange consumerExchange, - ConsumerControl control) { + + public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { Subscription sub = subscriptions.get(control.getConsumerId()); if (sub != null && sub instanceof AbstractSubscription) { - ((AbstractSubscription)sub).setPrefetchSize(control.getPrefetch()); + ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch()); if (LOG.isDebugEnabled()) { - LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: " + control.getConsumerId()); + LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: " + + control.getConsumerId()); } try { - lookup(consumerExchange.getConnectionContext(), control.getDestination()).wakeup(); + lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup(); } catch (Exception e) { LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java index c35f65c41b..07b82f1510 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java @@ -18,7 +18,6 @@ package org.apache.activemq.broker.region; import java.util.Map; import java.util.Set; - import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConsumerBrokerExchange; @@ -49,10 +48,11 @@ public interface Region extends Service { * * @param context * @param destination the destination to create. + * @param createIfTemporary * @return TODO * @throws Exception TODO */ - Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception; + Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception; /** * Used to destroy a destination. diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index c9982aa593..5869d4fd5a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -263,7 +263,7 @@ public class RegionBroker extends EmptyBroker { } @Override - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception { Destination answer; @@ -274,16 +274,16 @@ public class RegionBroker extends EmptyBroker { switch (destination.getDestinationType()) { case ActiveMQDestination.QUEUE_TYPE: - answer = queueRegion.addDestination(context, destination); + answer = queueRegion.addDestination(context, destination,true); break; case ActiveMQDestination.TOPIC_TYPE: - answer = topicRegion.addDestination(context, destination); + answer = topicRegion.addDestination(context, destination,true); break; case ActiveMQDestination.TEMP_QUEUE_TYPE: - answer = tempQueueRegion.addDestination(context, destination); + answer = tempQueueRegion.addDestination(context, destination,create); break; case ActiveMQDestination.TEMP_TOPIC_TYPE: - answer = tempTopicRegion.addDestination(context, destination); + answer = tempTopicRegion.addDestination(context, destination,create); break; default: throw createUnknownDestinationTypeException(destination); @@ -321,7 +321,7 @@ public class RegionBroker extends EmptyBroker { @Override public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { - addDestination(context, info.getDestination()); + addDestination(context, info.getDestination(),true); } @@ -349,7 +349,7 @@ public class RegionBroker extends EmptyBroker { if (destination != null) { // This seems to cause the destination to be added but without advisories firing... - context.getBroker().addDestination(context, destination); + context.getBroker().addDestination(context, destination,false); switch (destination.getDestinationType()) { case ActiveMQDestination.QUEUE_TYPE: queueRegion.addProducer(context, info); @@ -441,7 +441,7 @@ public class RegionBroker extends EmptyBroker { if (producerExchange.isMutable() || producerExchange.getRegion() == null) { ActiveMQDestination destination = message.getDestination(); // ensure the destination is registered with the RegionBroker - producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination); + producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination,false); Region region; switch (destination.getDestinationType()) { case ActiveMQDestination.QUEUE_TYPE: diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java index d9adb27513..3c86a4fadf 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java @@ -21,10 +21,8 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; - import javax.jms.InvalidDestinationException; import javax.jms.JMSException; - import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.policy.PolicyEntry; @@ -59,12 +57,13 @@ public class TopicRegion extends AbstractRegion { } + @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { if (info.isDurable()) { ActiveMQDestination destination = info.getDestination(); if (!destination.isPattern()) { // Make sure the destination is created. - lookup(context, destination); + lookup(context, destination,true); } String clientId = context.getClientId(); String subscriptionName = info.getSubscriptionName(); @@ -113,6 +112,7 @@ public class TopicRegion extends AbstractRegion { } } + @Override public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { if (info.isDurable()) { @@ -127,6 +127,7 @@ public class TopicRegion extends AbstractRegion { } } + @Override public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName()); DurableTopicSubscription sub = durableSubscriptions.get(key); @@ -151,6 +152,7 @@ public class TopicRegion extends AbstractRegion { super.removeConsumer(context, sub.getConsumerInfo()); } + @Override public String toString() { return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%"; } @@ -234,6 +236,7 @@ public class TopicRegion extends AbstractRegion { } } + @Override protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { ActiveMQDestination destination = info.getDestination(); @@ -290,6 +293,7 @@ public class TopicRegion extends AbstractRegion { return !info1.getDestination().equals(info2.getDestination()); } + @Override protected Set getInactiveDestinations() { Set inactiveDestinations = super.getInactiveDestinations(); for (Iterator iter = inactiveDestinations.iterator(); iter.hasNext();) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java index 6670a1672f..cd5a3e02da 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java @@ -18,7 +18,6 @@ package org.apache.activemq.broker.util; import java.io.IOException; import java.util.Set; - import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.ConnectionContext; @@ -147,6 +146,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements this.logInternalEvents = logInternalEvents; } + @Override public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { if (isLogAll() || isLogConsumerEvents()) { @@ -162,6 +162,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.acknowledge(consumerExchange, ack); } + @Override public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { if (isLogAll() || isLogConsumerEvents()) { @@ -171,6 +172,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements return super.messagePull(context, pull); } + @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { if (isLogAll() || isLogConnectionEvents()) { @@ -179,6 +181,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.addConnection(context, info); } + @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { if (isLogAll() || isLogConsumerEvents()) { @@ -187,6 +190,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements return super.addConsumer(context, info); } + @Override public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { if (isLogAll() || isLogProducerEvents()) { @@ -195,6 +199,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.addProducer(context, info); } + @Override public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { if (isLogAll() || isLogTransactionEvents()) { @@ -203,6 +208,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.commitTransaction(context, xid, onePhase); } + @Override public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { if (isLogAll() || isLogConsumerEvents()) { @@ -211,6 +217,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.removeSubscription(context, info); } + @Override public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { @@ -228,6 +235,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements return result; } + @Override public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { if (isLogAll() || isLogTransactionEvents()) { @@ -236,6 +244,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements return super.prepareTransaction(context, xid); } + @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { if (isLogAll() || isLogConnectionEvents()) { @@ -244,6 +253,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.removeConnection(context, info, error); } + @Override public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { if (isLogAll() || isLogConsumerEvents()) { @@ -252,6 +262,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.removeConsumer(context, info); } + @Override public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { if (isLogAll() || isLogProducerEvents()) { @@ -260,6 +271,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.removeProducer(context, info); } + @Override public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { if (isLogAll() || isLogTransactionEvents()) { @@ -268,6 +280,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.rollbackTransaction(context, xid); } + @Override public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { if (isLogAll() || isLogProducerEvents()) { @@ -276,6 +289,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.send(producerExchange, messageSend); } + @Override public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { if (isLogAll() || isLogTransactionEvents()) { @@ -284,6 +298,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.beginTransaction(context, xid); } + @Override public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { if (isLogAll() || isLogTransactionEvents()) { @@ -293,6 +308,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.forgetTransaction(context, transactionId); } + @Override public Connection[] getClients() throws Exception { Connection[] result = super.getClients(); @@ -311,17 +327,19 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements return super.getClients(); } + @Override public org.apache.activemq.broker.region.Destination addDestination( - ConnectionContext context, ActiveMQDestination destination) + ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception { if (isLogAll() || isLogInternalEvents()) { LOG.info("Adding destination : " + destination.getDestinationTypeAsString() + ":" + destination.getPhysicalName()); } - return super.addDestination(context, destination); + return super.addDestination(context, destination,create); } + @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { if (isLogAll() || isLogInternalEvents()) { @@ -332,6 +350,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.removeDestination(context, destination, timeout); } + @Override public ActiveMQDestination[] getDestinations() throws Exception { ActiveMQDestination[] result = super.getDestinations(); if (isLogAll() || isLogInternalEvents()) { @@ -349,6 +368,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements return result; } + @Override public void start() throws Exception { if (isLogAll() || isLogInternalEvents()) { LOG.info("Starting " + getBrokerName()); @@ -356,6 +376,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.start(); } + @Override public void stop() throws Exception { if (isLogAll() || isLogInternalEvents()) { LOG.info("Stopping " + getBrokerName()); @@ -363,6 +384,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.stop(); } + @Override public void addSession(ConnectionContext context, SessionInfo info) throws Exception { if (isLogAll() || isLogConnectionEvents()) { @@ -371,6 +393,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.addSession(context, info); } + @Override public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { if (isLogAll() || isLogConnectionEvents()) { @@ -379,6 +402,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.removeSession(context, info); } + @Override public void addBroker(Connection connection, BrokerInfo info) { if (isLogAll() || isLogInternalEvents()) { LOG.info("Adding Broker " + info.getBrokerName()); @@ -386,6 +410,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.addBroker(connection, info); } + @Override public void removeBroker(Connection connection, BrokerInfo info) { if (isLogAll() || isLogInternalEvents()) { LOG.info("Removing Broker " + info.getBrokerName()); @@ -393,6 +418,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.removeBroker(connection, info); } + @Override public BrokerInfo[] getPeerBrokerInfos() { BrokerInfo[] result = super.getPeerBrokerInfos(); if (isLogAll() || isLogInternalEvents()) { @@ -410,6 +436,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements return result; } + @Override public void preProcessDispatch(MessageDispatch messageDispatch) { if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { LOG.info("preProcessDispatch :" + messageDispatch); @@ -417,6 +444,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.preProcessDispatch(messageDispatch); } + @Override public void postProcessDispatch(MessageDispatch messageDispatch) { if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { LOG.info("postProcessDispatch :" + messageDispatch); @@ -424,6 +452,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.postProcessDispatch(messageDispatch); } + @Override public void processDispatchNotification( MessageDispatchNotification messageDispatchNotification) throws Exception { @@ -434,6 +463,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.processDispatchNotification(messageDispatchNotification); } + @Override public Set getDurableDestinations() { Set result = super.getDurableDestinations(); if (isLogAll() || isLogInternalEvents()) { @@ -451,6 +481,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements return result; } + @Override public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { if (isLogAll() || isLogInternalEvents()) { @@ -459,6 +490,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.addDestinationInfo(context, info); } + @Override public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { if (isLogAll() || isLogInternalEvents()) { @@ -467,6 +499,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.removeDestinationInfo(context, info); } + @Override public void messageExpired(ConnectionContext context, MessageReference message) { if (isLogAll() || isLogInternalEvents()) { @@ -480,6 +513,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.messageExpired(context, message); } + @Override public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference) { if (isLogAll() || isLogInternalEvents()) { @@ -492,6 +526,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements } } + @Override public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { @@ -500,6 +535,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.fastProducer(context, producerInfo); } + @Override public void isFull(ConnectionContext context, Destination destination, Usage usage) { if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { @@ -508,6 +544,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.isFull(context, destination, usage); } + @Override public void messageConsumed(ConnectionContext context, MessageReference messageReference) { if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { @@ -521,6 +558,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.messageConsumed(context, messageReference); } + @Override public void messageDelivered(ConnectionContext context, MessageReference messageReference) { if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { @@ -534,6 +572,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.messageDelivered(context, messageReference); } + @Override public void messageDiscarded(ConnectionContext context, MessageReference messageReference) { if (isLogAll() || isLogInternalEvents()) { @@ -547,6 +586,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.messageDiscarded(context, messageReference); } + @Override public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) { if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { @@ -561,6 +601,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.slowConsumer(context, destination, subs); } + @Override public void nowMasterBroker() { if (isLogAll() || isLogInternalEvents()) { LOG.info("Is now the master broker : " + getBrokerName()); @@ -568,6 +609,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements super.nowMasterBroker(); } + @Override public String toString() { StringBuffer buf = new StringBuffer(); buf.append("LoggingBrokerPlugin("); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/view/DestinationDotFileInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/view/DestinationDotFileInterceptor.java index 204f86d961..8b7e32c29e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/view/DestinationDotFileInterceptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/view/DestinationDotFileInterceptor.java @@ -19,7 +19,6 @@ package org.apache.activemq.broker.view; import java.io.PrintWriter; import java.util.Collection; import java.util.Iterator; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; @@ -38,17 +37,20 @@ public class DestinationDotFileInterceptor extends DotFileInterceptorSupport { super(next, file); } - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - Destination answer = super.addDestination(context, destination); + @Override + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception { + Destination answer = super.addDestination(context, destination,create); generateFile(); return answer; } + @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { super.removeDestination(context, destination, timeout); generateFile(); } + @Override protected void generateFile(PrintWriter writer) throws Exception { ActiveMQDestination[] destinations = getDestinations(); diff --git a/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java b/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java index 2327b9a7d3..65b4670995 100644 --- a/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java @@ -17,7 +17,6 @@ package org.apache.activemq.security; import java.util.Set; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.ConnectionContext; @@ -47,20 +46,22 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB this.authorizationMap = authorizationMap; } + @Override public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { - addDestination(context, info.getDestination()); + addDestination(context, info.getDestination(),true); super.addDestinationInfo(context, info); } - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - final SecurityContext securityContext = (SecurityContext)context.getSecurityContext(); + @Override + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception { + final SecurityContext securityContext = context.getSecurityContext(); if (securityContext == null) { throw new SecurityException("User is not authenticated."); } Destination existing = this.getDestinationMap().get(destination); if (existing != null) { - return super.addDestination(context, destination); + return super.addDestination(context, destination,create); } if (!securityContext.isBrokerContext()) { @@ -77,12 +78,13 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB } - return super.addDestination(context, destination); + return super.addDestination(context, destination,create); } + @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { - final SecurityContext securityContext = (SecurityContext)context.getSecurityContext(); + final SecurityContext securityContext = context.getSecurityContext(); if (securityContext == null) { throw new SecurityException("User is not authenticated."); } @@ -99,9 +101,10 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB super.removeDestination(context, destination, timeout); } + @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - final SecurityContext subject = (SecurityContext)context.getSecurityContext(); + final SecurityContext subject = context.getSecurityContext(); if (subject == null) { throw new SecurityException("User is not authenticated."); } @@ -141,9 +144,10 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB return super.addConsumer(context, info); } + @Override public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { - SecurityContext subject = (SecurityContext)context.getSecurityContext(); + SecurityContext subject = context.getSecurityContext(); if (subject == null) { throw new SecurityException("User is not authenticated."); } @@ -164,8 +168,9 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB super.addProducer(context, info); } + @Override public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { - SecurityContext subject = (SecurityContext)producerExchange.getConnectionContext().getSecurityContext(); + SecurityContext subject = producerExchange.getConnectionContext().getSecurityContext(); if (subject == null) { throw new SecurityException("User is not authenticated."); } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/DestinationRemoveRestartTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/DestinationRemoveRestartTest.java index 25a2cbd1d9..fd97f87cc8 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/DestinationRemoveRestartTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/DestinationRemoveRestartTest.java @@ -17,7 +17,6 @@ package org.apache.activemq.broker.region; import junit.framework.Test; - import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQDestination; @@ -28,6 +27,7 @@ public class DestinationRemoveRestartTest extends CombinationTestSupport { public byte destinationType; BrokerService broker; + @Override protected void setUp() throws Exception { broker = createBroker(); } @@ -40,6 +40,7 @@ public class DestinationRemoveRestartTest extends CombinationTestSupport { return broker; } + @Override protected void tearDown() throws Exception { broker.stop(); } @@ -63,7 +64,7 @@ public class DestinationRemoveRestartTest extends CombinationTestSupport { ActiveMQDestination amqDestination = ActiveMQDestination.createDestination(destinationName, destinationType); - broker.getRegionBroker().addDestination(broker.getAdminConnectionContext(), (ActiveMQDestination) amqDestination); + broker.getRegionBroker().addDestination(broker.getAdminConnectionContext(), amqDestination,true); final ActiveMQDestination[] list = broker.getRegionBroker().getDestinations(); for (final ActiveMQDestination element : list) { diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java index 7722e493c9..ee2b3fe9cb 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java @@ -56,7 +56,7 @@ public class AMQ2571Test extends EmbeddedBrokerTestSupport { Thread sendingThread = new Thread(new Runnable() { public void run() { try { - for (int i = 0; i < 5000; i++) { + for (int i = 0; i < 100000; i++) { producerB.send(message); } } catch (JMSException e) { @@ -76,7 +76,7 @@ public class AMQ2571Test extends EmbeddedBrokerTestSupport { // Sleep for a while to make sure that we should know that the // TempQueue is gone. - Thread.sleep(5000); + //Thread.sleep(50); // Now we test if we are able to send again. try { diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java index 83d83bde8a..1efa17b157 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java @@ -40,7 +40,7 @@ public class SimpleQueueTest extends SimpleTopicTest { @Override protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { PerfConsumer consumer = new PerfConsumer(fac, dest); - consumer.setInitialDelay(10000); + //consumer.setInitialDelay(10000); //consumer.setSleepDuration(10); boolean enableAudit = numberOfConsumers <= 1; System.err.println("Enable Audit = " + enableAudit); diff --git a/activemq-core/src/test/java/org/apache/activemq/xbean/XBeanConfigTest.java b/activemq-core/src/test/java/org/apache/activemq/xbean/XBeanConfigTest.java index 9a2a8e071e..fc6ac58c19 100644 --- a/activemq-core/src/test/java/org/apache/activemq/xbean/XBeanConfigTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/xbean/XBeanConfigTest.java @@ -17,9 +17,7 @@ package org.apache.activemq.xbean; import java.net.URI; - import junit.framework.TestCase; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; @@ -54,7 +52,7 @@ public class XBeanConfigTest extends TestCase { // Validate the system properties are being evaluated in xbean. assertEquals("testbroker", brokerService.getBrokerName()); - Topic topic = (Topic)broker.addDestination(context, new ActiveMQTopic("FOO.BAR")); + Topic topic = (Topic)broker.addDestination(context, new ActiveMQTopic("FOO.BAR"),true); DispatchPolicy dispatchPolicy = topic.getDispatchPolicy(); assertTrue("dispatchPolicy should be RoundRobinDispatchPolicy: " + dispatchPolicy, dispatchPolicy instanceof RoundRobinDispatchPolicy); @@ -66,7 +64,7 @@ public class XBeanConfigTest extends TestCase { LOG.info("dispatchPolicy: " + dispatchPolicy); LOG.info("subscriptionRecoveryPolicy: " + subscriptionRecoveryPolicy); - topic = (Topic)broker.addDestination(context, new ActiveMQTopic("ORDERS.BOOKS")); + topic = (Topic)broker.addDestination(context, new ActiveMQTopic("ORDERS.BOOKS"),true); dispatchPolicy = topic.getDispatchPolicy(); assertTrue("dispatchPolicy should be StrictOrderDispatchPolicy: " + dispatchPolicy, dispatchPolicy instanceof StrictOrderDispatchPolicy); @@ -81,6 +79,7 @@ public class XBeanConfigTest extends TestCase { LOG.info("subscriptionRecoveryPolicy: " + subscriptionRecoveryPolicy); } + @Override protected void setUp() throws Exception { System.setProperty("brokername", "testbroker"); brokerService = createBroker(); @@ -106,6 +105,7 @@ public class XBeanConfigTest extends TestCase { assertNotNull("No broker created!"); } + @Override protected void tearDown() throws Exception { if (brokerService != null) { brokerService.stop();