git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@905926 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-02-03 08:02:49 +00:00
parent 02cd1537c9
commit 204f91f935
20 changed files with 246 additions and 146 deletions

View File

@ -33,7 +33,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionConsumer; import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData; import javax.jms.ConnectionMetaData;
@ -41,6 +40,7 @@ import javax.jms.DeliveryMode;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.ExceptionListener; import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException; import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.QueueConnection; import javax.jms.QueueConnection;
@ -51,8 +51,7 @@ import javax.jms.Topic;
import javax.jms.TopicConnection; import javax.jms.TopicConnection;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import javax.jms.XAConnection; import javax.jms.XAConnection;
import javax.jms.InvalidDestinationException; import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
@ -97,7 +96,6 @@ import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -182,9 +180,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// Assume that protocol is the latest. Change to the actual protocol // Assume that protocol is the latest. Change to the actual protocol
// version when a WireFormatInfo is received. // version when a WireFormatInfo is received.
private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
private long timeCreated; private final long timeCreated;
private ConnectionAudit connectionAudit = new ConnectionAudit(); private final ConnectionAudit connectionAudit = new ConnectionAudit();
private DestinationSource destinationSource; private DestinationSource destinationSource;
private final Object ensureConnectionInfoSentMutex = new Object(); private final Object ensureConnectionInfoSentMutex = new Object();
private boolean useDedicatedTaskRunner; private boolean useDedicatedTaskRunner;
@ -1906,12 +1904,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
activeTempDestinations.remove(destination); activeTempDestinations.remove(destination);
DestinationInfo info = new DestinationInfo(); DestinationInfo destInfo = new DestinationInfo();
info.setConnectionId(this.info.getConnectionId()); destInfo.setConnectionId(this.info.getConnectionId());
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
info.setDestination(destination); destInfo.setDestination(destination);
info.setTimeout(0); destInfo.setTimeout(0);
syncSendPacket(info); syncSendPacket(destInfo);
} }
public boolean isDeleted(ActiveMQDestination dest) { public boolean isDeleted(ActiveMQDestination dest) {
@ -2199,6 +2197,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.copyMessageOnSend = copyMessageOnSend; this.copyMessageOnSend = copyMessageOnSend;
} }
@Override
public String toString() { public String toString() {
return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}"; return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
} }

View File

@ -20,7 +20,6 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -73,6 +72,7 @@ public class AdvisoryBroker extends BrokerFilter {
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
} }
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
super.addConnection(context, info); super.addConnection(context, info);
@ -85,6 +85,7 @@ public class AdvisoryBroker extends BrokerFilter {
connections.put(copy.getConnectionId(), copy); connections.put(copy.getConnectionId(), copy);
} }
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription answer = super.addConsumer(context, info); Subscription answer = super.addConsumer(context, info);
@ -138,6 +139,7 @@ public class AdvisoryBroker extends BrokerFilter {
return answer; return answer;
} }
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.addProducer(context, info); super.addProducer(context, info);
@ -149,8 +151,9 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { @Override
Destination answer = super.addDestination(context, destination); public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
Destination answer = super.addDestination(context, destination,create);
if (!AdvisorySupport.isAdvisoryTopic(destination)) { if (!AdvisorySupport.isAdvisoryTopic(destination)) {
DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
DestinationInfo previous = destinations.putIfAbsent(destination, info); DestinationInfo previous = destinations.putIfAbsent(destination, info);
@ -162,6 +165,7 @@ public class AdvisoryBroker extends BrokerFilter {
return answer; return answer;
} }
@Override
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
next.addDestinationInfo(context, info); next.addDestinationInfo(context, info);
@ -175,6 +179,7 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
super.removeDestination(context, destination, timeout); super.removeDestination(context, destination, timeout);
DestinationInfo info = destinations.remove(destination); DestinationInfo info = destinations.remove(destination);
@ -195,6 +200,7 @@ public class AdvisoryBroker extends BrokerFilter {
} }
@Override
public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception { public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
super.removeDestinationInfo(context, destInfo); super.removeDestinationInfo(context, destInfo);
DestinationInfo info = destinations.remove(destInfo.getDestination()); DestinationInfo info = destinations.remove(destInfo.getDestination());
@ -216,6 +222,7 @@ public class AdvisoryBroker extends BrokerFilter {
} }
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
super.removeConnection(context, info, error); super.removeConnection(context, info, error);
@ -224,6 +231,7 @@ public class AdvisoryBroker extends BrokerFilter {
connections.remove(info.getConnectionId()); connections.remove(info.getConnectionId());
} }
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
super.removeConsumer(context, info); super.removeConsumer(context, info);
@ -238,6 +246,7 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.removeProducer(context, info); super.removeProducer(context, info);
@ -252,6 +261,7 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
@Override
public void messageExpired(ConnectionContext context, MessageReference messageReference) { public void messageExpired(ConnectionContext context, MessageReference messageReference) {
super.messageExpired(context, messageReference); super.messageExpired(context, messageReference);
try { try {
@ -268,6 +278,7 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
@Override
public void messageConsumed(ConnectionContext context, MessageReference messageReference) { public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
super.messageConsumed(context, messageReference); super.messageConsumed(context, messageReference);
try { try {
@ -282,6 +293,7 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
@Override
public void messageDelivered(ConnectionContext context, MessageReference messageReference) { public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
super.messageDelivered(context, messageReference); super.messageDelivered(context, messageReference);
try { try {
@ -296,6 +308,7 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
@Override
public void messageDiscarded(ConnectionContext context, MessageReference messageReference) { public void messageDiscarded(ConnectionContext context, MessageReference messageReference) {
super.messageDiscarded(context, messageReference); super.messageDiscarded(context, messageReference);
try { try {
@ -310,6 +323,7 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
@Override
public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
super.slowConsumer(context, destination,subs); super.slowConsumer(context, destination,subs);
try { try {
@ -322,6 +336,7 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
@Override
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
super.fastProducer(context, producerInfo); super.fastProducer(context, producerInfo);
try { try {
@ -334,6 +349,7 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
@Override
public void isFull(ConnectionContext context,Destination destination,Usage usage) { public void isFull(ConnectionContext context,Destination destination,Usage usage) {
super.isFull(context,destination, usage); super.isFull(context,destination, usage);
try { try {
@ -346,6 +362,7 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
@Override
public void nowMasterBroker() { public void nowMasterBroker() {
super.nowMasterBroker(); super.nowMasterBroker();
try { try {

View File

@ -43,6 +43,7 @@ public class BrokerBroadcaster extends BrokerFilter {
super(next); super(next);
} }
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
next.acknowledge(consumerExchange, ack); next.acknowledge(consumerExchange, ack);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -51,6 +52,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
next.addConnection(context, info); next.addConnection(context, info);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -59,6 +61,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription answer = next.addConsumer(context, info); Subscription answer = next.addConsumer(context, info);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -68,6 +71,7 @@ public class BrokerBroadcaster extends BrokerFilter {
return answer; return answer;
} }
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
next.addProducer(context, info); next.addProducer(context, info);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -76,6 +80,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
next.commitTransaction(context, xid, onePhase); next.commitTransaction(context, xid, onePhase);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -84,6 +89,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
next.removeSubscription(context, info); next.removeSubscription(context, info);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -92,6 +98,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
int result = next.prepareTransaction(context, xid); int result = next.prepareTransaction(context, xid);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -102,6 +109,7 @@ public class BrokerBroadcaster extends BrokerFilter {
return result; return result;
} }
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
next.removeConnection(context, info, error); next.removeConnection(context, info, error);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -110,6 +118,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
next.removeConsumer(context, info); next.removeConsumer(context, info);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -118,6 +127,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
next.removeProducer(context, info); next.removeProducer(context, info);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -126,6 +136,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
next.rollbackTransaction(context, xid); next.rollbackTransaction(context, xid);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -134,6 +145,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
next.send(producerExchange, messageSend); next.send(producerExchange, messageSend);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -142,6 +154,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
next.beginTransaction(context, xid); next.beginTransaction(context, xid);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -150,6 +163,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
next.forgetTransaction(context, transactionId); next.forgetTransaction(context, transactionId);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -158,15 +172,17 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { @Override
Destination result = next.addDestination(context, destination); public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
Destination result = next.addDestination(context, destination,createIfTemporary);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) { for (int i = 0; i < brokers.length; i++) {
brokers[i].addDestination(context, destination); brokers[i].addDestination(context, destination,createIfTemporary);
} }
return result; return result;
} }
@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
next.removeDestination(context, destination, timeout); next.removeDestination(context, destination, timeout);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -175,6 +191,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void start() throws Exception { public void start() throws Exception {
next.start(); next.start();
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -183,6 +200,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void stop() throws Exception { public void stop() throws Exception {
next.stop(); next.stop();
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -191,6 +209,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void addSession(ConnectionContext context, SessionInfo info) throws Exception { public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
next.addSession(context, info); next.addSession(context, info);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -199,6 +218,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
next.removeSession(context, info); next.removeSession(context, info);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -207,6 +227,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void gc() { public void gc() {
next.gc(); next.gc();
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();
@ -215,6 +236,7 @@ public class BrokerBroadcaster extends BrokerFilter {
} }
} }
@Override
public void addBroker(Connection connection, BrokerInfo info) { public void addBroker(Connection connection, BrokerInfo info) {
next.addBroker(connection, info); next.addBroker(connection, info);
Broker brokers[] = getListeners(); Broker brokers[] = getListeners();

View File

@ -139,8 +139,8 @@ public class BrokerFilter implements Broker {
return next.getClients(); return next.getClients();
} }
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
return next.addDestination(context, destination); return next.addDestination(context, destination,createIfTemporary);
} }
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {

View File

@ -1418,7 +1418,7 @@ public class BrokerService implements Service {
* JMS name * JMS name
*/ */
public Destination getDestination(ActiveMQDestination destination) throws Exception { public Destination getDestination(ActiveMQDestination destination) throws Exception {
return getBroker().addDestination(getAdminConnectionContext(), destination); return getBroker().addDestination(getAdminConnectionContext(), destination,false);
} }
public void removeDestination(ActiveMQDestination destination) throws Exception { public void removeDestination(ActiveMQDestination destination) throws Exception {
@ -1886,7 +1886,7 @@ public class BrokerService implements Service {
ConnectionContext adminConnectionContext = getAdminConnectionContext(); ConnectionContext adminConnectionContext = getAdminConnectionContext();
for (int i = 0; i < destinations.length; i++) { for (int i = 0; i < destinations.length; i++) {
ActiveMQDestination destination = destinations[i]; ActiveMQDestination destination = destinations[i];
getBroker().addDestination(adminConnectionContext, destination); getBroker().addDestination(adminConnectionContext, destination,true);
} }
} }
} }
@ -2054,7 +2054,7 @@ public class BrokerService implements Service {
} }
while (iter.hasNext()) { while (iter.hasNext()) {
ActiveMQDestination destination = (ActiveMQDestination) iter.next(); ActiveMQDestination destination = (ActiveMQDestination) iter.next();
broker.addDestination(adminConnectionContext, destination); broker.addDestination(adminConnectionContext, destination,false);
} }
} }
} }

View File

@ -134,7 +134,7 @@ public class EmptyBroker implements Broker {
} }
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean flag) throws Exception {
return null; return null;
} }

View File

@ -137,7 +137,7 @@ public class ErrorBroker implements Broker {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean flag) throws Exception {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }

View File

@ -145,8 +145,8 @@ public class MutableBrokerFilter implements Broker {
return getNext().getClients(); return getNext().getClients();
} }
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
return getNext().addDestination(context, destination); return getNext().addDestination(context, destination,createIfTemporary);
} }
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {

View File

@ -23,9 +23,7 @@ import java.lang.reflect.Method;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -237,11 +235,11 @@ public class BrokerView implements BrokerViewMBean {
} }
public void addTopic(String name) throws Exception { public void addTopic(String name) throws Exception {
broker.addDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQTopic(name)); broker.addDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQTopic(name),true);
} }
public void addQueue(String name) throws Exception { public void addQueue(String name) throws Exception {
broker.addDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name)); broker.addDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name),true);
} }
public void removeTopic(String name) throws Exception { public void removeTopic(String name) throws Exception {

View File

@ -67,8 +67,8 @@ public abstract class AbstractRegion implements Region {
protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>(); protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
protected boolean started; protected boolean started;
public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager,
DestinationFactory destinationFactory) { TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
if (broker == null) { if (broker == null) {
throw new IllegalArgumentException("null broker"); throw new IllegalArgumentException("null broker");
} }
@ -82,7 +82,7 @@ public abstract class AbstractRegion implements Region {
this.destinationFactory = destinationFactory; this.destinationFactory = destinationFactory;
} }
public final void start() throws Exception { public final void start() throws Exception {
started = true; started = true;
Set<ActiveMQDestination> inactiveDests = getInactiveDestinations(); Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
@ -92,7 +92,7 @@ public abstract class AbstractRegion implements Region {
ConnectionContext context = new ConnectionContext(); ConnectionContext context = new ConnectionContext();
context.setBroker(broker.getBrokerService().getBroker()); context.setBroker(broker.getBrokerService().getBroker());
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
context.getBroker().addDestination(context, dest); context.getBroker().addDestination(context, dest, false);
} }
synchronized (destinationsMutex) { synchronized (destinationsMutex) {
for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
@ -113,21 +113,27 @@ public abstract class AbstractRegion implements Region {
destinations.clear(); destinations.clear();
} }
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
boolean createIfTemporary) throws Exception {
LOG.debug(broker.getBrokerName() + " adding destination: " + destination); LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
synchronized (destinationsMutex) { synchronized (destinationsMutex) {
Destination dest = destinations.get(destination); Destination dest = destinations.get(destination);
if (dest == null) { if (dest == null) {
dest = createDestination(context, destination); if (destination.isTemporary() == false || createIfTemporary) {
// intercept if there is a valid interceptor defined dest = createDestination(context, destination);
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); // intercept if there is a valid interceptor defined
if (destinationInterceptor != null) { DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
dest = destinationInterceptor.intercept(dest); if (destinationInterceptor != null) {
dest = destinationInterceptor.intercept(dest);
}
dest.start();
destinations.put(destination, dest);
destinationMap.put(destination, dest);
addSubscriptionsForDestination(context, dest);
}
if (dest == null) {
throw new JMSException("The destination " + destination + " does not exist.");
} }
dest.start();
destinations.put(destination, dest);
destinationMap.put(destination, dest);
addSubscriptionsForDestination(context, dest);
} }
return dest; return dest;
} }
@ -136,8 +142,9 @@ public abstract class AbstractRegion implements Region {
public Map<ConsumerId, Subscription> getSubscriptions() { public Map<ConsumerId, Subscription> getSubscriptions() {
return subscriptions; return subscriptions;
} }
protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest)
throws Exception {
List<Subscription> rc = new ArrayList<Subscription>(); List<Subscription> rc = new ArrayList<Subscription>();
// Add all consumers that are interested in the destination. // Add all consumers that are interested in the destination.
@ -152,7 +159,8 @@ public abstract class AbstractRegion implements Region {
} }
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
throws Exception {
// No timeout.. then try to shut down right way, fails if there are // No timeout.. then try to shut down right way, fails if there are
// current subscribers. // current subscribers.
@ -174,7 +182,7 @@ public abstract class AbstractRegion implements Region {
} }
LOG.debug("Removing destination: " + destination); LOG.debug("Removing destination: " + destination);
synchronized (destinationsMutex) { synchronized (destinationsMutex) {
Destination dest = destinations.remove(destination); Destination dest = destinations.remove(destination);
if (dest != null) { if (dest != null) {
@ -187,13 +195,13 @@ public abstract class AbstractRegion implements Region {
} }
} }
destinationMap.removeAll(destination); destinationMap.removeAll(destination);
dispose(context,dest); dispose(context, dest);
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
if (destinationInterceptor != null) { if (destinationInterceptor != null) {
destinationInterceptor.remove(dest); destinationInterceptor.remove(dest);
} }
} else { } else {
LOG.debug("Destination doesn't exist: " + dest); LOG.debug("Destination doesn't exist: " + dest);
} }
} }
@ -217,11 +225,12 @@ public abstract class AbstractRegion implements Region {
} }
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: " + info.getDestination()); LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: "
+ info.getDestination());
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
if (destination != null && !destination.isPattern() && !destination.isComposite()) { if (destination != null && !destination.isPattern() && !destination.isComposite()) {
// lets auto-create the destination // lets auto-create the destination
lookup(context, destination); lookup(context, destination,true);
} }
Object addGuard; Object addGuard;
@ -235,7 +244,8 @@ public abstract class AbstractRegion implements Region {
synchronized (addGuard) { synchronized (addGuard) {
Subscription o = subscriptions.get(info.getConsumerId()); Subscription o = subscriptions.get(info.getConsumerId());
if (o != null) { if (o != null) {
LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this."); LOG
.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
return o; return o;
} }
@ -268,20 +278,20 @@ public abstract class AbstractRegion implements Region {
// Add the subscription to all the matching queues. // Add the subscription to all the matching queues.
// But copy the matches first - to prevent deadlocks // But copy the matches first - to prevent deadlocks
List<Destination>addList = new ArrayList<Destination>(); List<Destination> addList = new ArrayList<Destination>();
synchronized(destinationsMutex) { synchronized (destinationsMutex) {
for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
Destination dest = (Destination)iter.next(); Destination dest = (Destination) iter.next();
addList.add(dest); addList.add(dest);
} }
} }
for (Destination dest:addList) { for (Destination dest : addList) {
dest.addSubscription(context, sub); dest.addSubscription(context, sub);
} }
if (info.isBrowser()) { if (info.isBrowser()) {
((QueueBrowserSubscription)sub).destinationsAdded(); ((QueueBrowserSubscription) sub).destinationsAdded();
} }
return sub; return sub;
@ -309,24 +319,24 @@ public abstract class AbstractRegion implements Region {
} }
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: " + info.getDestination()); LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: "
+ info.getDestination());
Subscription sub = subscriptions.remove(info.getConsumerId()); Subscription sub = subscriptions.remove(info.getConsumerId());
//The sub could be removed elsewhere - see ConnectionSplitBroker // The sub could be removed elsewhere - see ConnectionSplitBroker
if (sub != null) { if (sub != null) {
// remove the subscription from all the matching queues. // remove the subscription from all the matching queues.
List<Destination> removeList = new ArrayList<Destination>(); List<Destination> removeList = new ArrayList<Destination>();
synchronized (destinationsMutex) { synchronized (destinationsMutex) {
for (Iterator iter = destinationMap.get(info.getDestination()) for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
.iterator(); iter.hasNext();) {
Destination dest = (Destination) iter.next(); Destination dest = (Destination) iter.next();
removeList.add(dest); removeList.add(dest);
} }
} }
for(Destination dest:removeList) { for (Destination dest : removeList) {
dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId()); dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
} }
destroySubscription(sub); destroySubscription(sub);
@ -348,7 +358,7 @@ public abstract class AbstractRegion implements Region {
final ConnectionContext context = producerExchange.getConnectionContext(); final ConnectionContext context = producerExchange.getConnectionContext();
if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) { if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) {
final Destination regionDestination = lookup(context, messageSend.getDestination()); final Destination regionDestination = lookup(context, messageSend.getDestination(),false);
producerExchange.setRegionDestination(regionDestination); producerExchange.setRegionDestination(regionDestination);
} }
@ -358,13 +368,11 @@ public abstract class AbstractRegion implements Region {
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
Subscription sub = consumerExchange.getSubscription(); Subscription sub = consumerExchange.getSubscription();
if (sub == null) { if (sub == null) {
sub = subscriptions.get(ack.getConsumerId()); sub = subscriptions.get(ack.getConsumerId());
if (sub == null) { if (sub == null) {
if (!consumerExchange.getConnectionContext().isInRecoveryMode()) { if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
LOG.warn("Ack for non existent subscription, ack:" + ack); LOG.warn("Ack for non existent subscription, ack:" + ack);
throw new IllegalArgumentException( throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
"The subscription does not exist: "
+ ack.getConsumerId());
} else { } else {
return; return;
} }
@ -382,19 +390,19 @@ public abstract class AbstractRegion implements Region {
return sub.pullMessage(context, pull); return sub.pullMessage(context, pull);
} }
protected Destination lookup(ConnectionContext context, ActiveMQDestination destination) throws Exception { protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
Destination dest = null; Destination dest = null;
synchronized (destinationsMutex) { synchronized (destinationsMutex) {
dest = destinations.get(destination); dest = destinations.get(destination);
} }
if (dest == null) { if (dest == null) {
if (autoCreateDestinations) { if (isAutoCreateDestinations()) {
// Try to auto create the destination... re-invoke broker // Try to auto create the destination... re-invoke broker
// from the // from the
// top so that the proper security checks are performed. // top so that the proper security checks are performed.
try { try {
context.getBroker().addDestination(context, destination); context.getBroker().addDestination(context, destination, createTemporary);
dest = addDestination(context, destination); dest = addDestination(context, destination, false);
} catch (DestinationAlreadyExistsException e) { } catch (DestinationAlreadyExistsException e) {
// if the destination already exists then lets ignore // if the destination already exists then lets ignore
// this error // this error
@ -417,18 +425,19 @@ public abstract class AbstractRegion implements Region {
sub.processMessageDispatchNotification(messageDispatchNotification); sub.processMessageDispatchNotification(messageDispatchNotification);
} else { } else {
throw new JMSException("Slave broker out of sync with master - Subscription: " throw new JMSException("Slave broker out of sync with master - Subscription: "
+ messageDispatchNotification.getConsumerId() + messageDispatchNotification.getConsumerId() + " on "
+ " on " + messageDispatchNotification.getDestination() + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: "
+ " does not exist for dispatch of message: "
+ messageDispatchNotification.getMessageId()); + messageDispatchNotification.getMessageId());
} }
} }
/* /*
* For a Queue/TempQueue, dispatch order is imperative to match acks, so the dispatch is deferred till * For a Queue/TempQueue, dispatch order is imperative to match acks, so the
* the notification to ensure that the subscription chosen by the master is used. AMQ-2102 * dispatch is deferred till the notification to ensure that the
*/ * subscription chosen by the master is used. AMQ-2102
protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification) throws Exception { */
protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification)
throws Exception {
Destination dest = null; Destination dest = null;
synchronized (destinationsMutex) { synchronized (destinationsMutex) {
dest = destinations.get(messageDispatchNotification.getDestination()); dest = destinations.get(messageDispatchNotification.getDestination());
@ -436,13 +445,10 @@ public abstract class AbstractRegion implements Region {
if (dest != null) { if (dest != null) {
dest.processDispatchNotification(messageDispatchNotification); dest.processDispatchNotification(messageDispatchNotification);
} else { } else {
throw new JMSException( throw new JMSException("Slave broker out of sync with master - Destination: "
"Slave broker out of sync with master - Destination: " + messageDispatchNotification.getDestination() + " does not exist for consumer "
+ messageDispatchNotification.getDestination() + messageDispatchNotification.getConsumerId() + " with message: "
+ " does not exist for consumer " + messageDispatchNotification.getMessageId());
+ messageDispatchNotification.getConsumerId()
+ " with message: "
+ messageDispatchNotification.getMessageId());
} }
} }
@ -461,7 +467,8 @@ public abstract class AbstractRegion implements Region {
protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception; protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination)
throws Exception {
return destinationFactory.createDestination(context, destination, destinationStatistics); return destinationFactory.createDestination(context, destination, destinationStatistics);
} }
@ -472,8 +479,8 @@ public abstract class AbstractRegion implements Region {
public void setAutoCreateDestinations(boolean autoCreateDestinations) { public void setAutoCreateDestinations(boolean autoCreateDestinations) {
this.autoCreateDestinations = autoCreateDestinations; this.autoCreateDestinations = autoCreateDestinations;
} }
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{ public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
synchronized (destinationsMutex) { synchronized (destinationsMutex) {
for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
Destination dest = (Destination) iter.next(); Destination dest = (Destination) iter.next();
@ -484,34 +491,37 @@ public abstract class AbstractRegion implements Region {
/** /**
* Removes a Producer. * Removes a Producer.
* @param context the environment the operation is being executed under. *
* @throws Exception TODO * @param context
* the environment the operation is being executed under.
* @throws Exception
* TODO
*/ */
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{ public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
synchronized (destinationsMutex) { synchronized (destinationsMutex) {
for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
Destination dest = (Destination)iter.next(); Destination dest = (Destination) iter.next();
dest.removeProducer(context, info); dest.removeProducer(context, info);
} }
} }
} }
protected void dispose(ConnectionContext context,Destination dest) throws Exception { protected void dispose(ConnectionContext context, Destination dest) throws Exception {
dest.dispose(context); dest.dispose(context);
dest.stop(); dest.stop();
destinationFactory.removeDestination(dest); destinationFactory.removeDestination(dest);
} }
public void processConsumerControl(ConsumerBrokerExchange consumerExchange, public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
ConsumerControl control) {
Subscription sub = subscriptions.get(control.getConsumerId()); Subscription sub = subscriptions.get(control.getConsumerId());
if (sub != null && sub instanceof AbstractSubscription) { if (sub != null && sub instanceof AbstractSubscription) {
((AbstractSubscription)sub).setPrefetchSize(control.getPrefetch()); ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: " + control.getConsumerId()); LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: "
+ control.getConsumerId());
} }
try { try {
lookup(consumerExchange.getConnectionContext(), control.getDestination()).wakeup(); lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
} catch (Exception e) { } catch (Exception e) {
LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e); LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e);
} }

View File

@ -18,7 +18,6 @@ package org.apache.activemq.broker.region;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.broker.ConsumerBrokerExchange;
@ -49,10 +48,11 @@ public interface Region extends Service {
* *
* @param context * @param context
* @param destination the destination to create. * @param destination the destination to create.
* @param createIfTemporary
* @return TODO * @return TODO
* @throws Exception TODO * @throws Exception TODO
*/ */
Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception; Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception;
/** /**
* Used to destroy a destination. * Used to destroy a destination.

View File

@ -263,7 +263,7 @@ public class RegionBroker extends EmptyBroker {
} }
@Override @Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
Destination answer; Destination answer;
@ -274,16 +274,16 @@ public class RegionBroker extends EmptyBroker {
switch (destination.getDestinationType()) { switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE: case ActiveMQDestination.QUEUE_TYPE:
answer = queueRegion.addDestination(context, destination); answer = queueRegion.addDestination(context, destination,true);
break; break;
case ActiveMQDestination.TOPIC_TYPE: case ActiveMQDestination.TOPIC_TYPE:
answer = topicRegion.addDestination(context, destination); answer = topicRegion.addDestination(context, destination,true);
break; break;
case ActiveMQDestination.TEMP_QUEUE_TYPE: case ActiveMQDestination.TEMP_QUEUE_TYPE:
answer = tempQueueRegion.addDestination(context, destination); answer = tempQueueRegion.addDestination(context, destination,create);
break; break;
case ActiveMQDestination.TEMP_TOPIC_TYPE: case ActiveMQDestination.TEMP_TOPIC_TYPE:
answer = tempTopicRegion.addDestination(context, destination); answer = tempTopicRegion.addDestination(context, destination,create);
break; break;
default: default:
throw createUnknownDestinationTypeException(destination); throw createUnknownDestinationTypeException(destination);
@ -321,7 +321,7 @@ public class RegionBroker extends EmptyBroker {
@Override @Override
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
addDestination(context, info.getDestination()); addDestination(context, info.getDestination(),true);
} }
@ -349,7 +349,7 @@ public class RegionBroker extends EmptyBroker {
if (destination != null) { if (destination != null) {
// This seems to cause the destination to be added but without advisories firing... // This seems to cause the destination to be added but without advisories firing...
context.getBroker().addDestination(context, destination); context.getBroker().addDestination(context, destination,false);
switch (destination.getDestinationType()) { switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE: case ActiveMQDestination.QUEUE_TYPE:
queueRegion.addProducer(context, info); queueRegion.addProducer(context, info);
@ -441,7 +441,7 @@ public class RegionBroker extends EmptyBroker {
if (producerExchange.isMutable() || producerExchange.getRegion() == null) { if (producerExchange.isMutable() || producerExchange.getRegion() == null) {
ActiveMQDestination destination = message.getDestination(); ActiveMQDestination destination = message.getDestination();
// ensure the destination is registered with the RegionBroker // ensure the destination is registered with the RegionBroker
producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination); producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination,false);
Region region; Region region;
switch (destination.getDestinationType()) { switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE: case ActiveMQDestination.QUEUE_TYPE:

View File

@ -21,10 +21,8 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.jms.InvalidDestinationException; import javax.jms.InvalidDestinationException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -59,12 +57,13 @@ public class TopicRegion extends AbstractRegion {
} }
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
if (info.isDurable()) { if (info.isDurable()) {
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
if (!destination.isPattern()) { if (!destination.isPattern()) {
// Make sure the destination is created. // Make sure the destination is created.
lookup(context, destination); lookup(context, destination,true);
} }
String clientId = context.getClientId(); String clientId = context.getClientId();
String subscriptionName = info.getSubscriptionName(); String subscriptionName = info.getSubscriptionName();
@ -113,6 +112,7 @@ public class TopicRegion extends AbstractRegion {
} }
} }
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
if (info.isDurable()) { if (info.isDurable()) {
@ -127,6 +127,7 @@ public class TopicRegion extends AbstractRegion {
} }
} }
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName()); SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName());
DurableTopicSubscription sub = durableSubscriptions.get(key); DurableTopicSubscription sub = durableSubscriptions.get(key);
@ -151,6 +152,7 @@ public class TopicRegion extends AbstractRegion {
super.removeConsumer(context, sub.getConsumerInfo()); super.removeConsumer(context, sub.getConsumerInfo());
} }
@Override
public String toString() { public String toString() {
return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%"; return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
} }
@ -234,6 +236,7 @@ public class TopicRegion extends AbstractRegion {
} }
} }
@Override
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
@ -290,6 +293,7 @@ public class TopicRegion extends AbstractRegion {
return !info1.getDestination().equals(info2.getDestination()); return !info1.getDestination().equals(info2.getDestination());
} }
@Override
protected Set<ActiveMQDestination> getInactiveDestinations() { protected Set<ActiveMQDestination> getInactiveDestinations() {
Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations(); Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) { for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {

View File

@ -18,7 +18,6 @@ package org.apache.activemq.broker.util;
import java.io.IOException; import java.io.IOException;
import java.util.Set; import java.util.Set;
import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -147,6 +146,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
this.logInternalEvents = logInternalEvents; this.logInternalEvents = logInternalEvents;
} }
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, public void acknowledge(ConsumerBrokerExchange consumerExchange,
MessageAck ack) throws Exception { MessageAck ack) throws Exception {
if (isLogAll() || isLogConsumerEvents()) { if (isLogAll() || isLogConsumerEvents()) {
@ -162,6 +162,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.acknowledge(consumerExchange, ack); super.acknowledge(consumerExchange, ack);
} }
@Override
public Response messagePull(ConnectionContext context, MessagePull pull) public Response messagePull(ConnectionContext context, MessagePull pull)
throws Exception { throws Exception {
if (isLogAll() || isLogConsumerEvents()) { if (isLogAll() || isLogConsumerEvents()) {
@ -171,6 +172,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
return super.messagePull(context, pull); return super.messagePull(context, pull);
} }
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) public void addConnection(ConnectionContext context, ConnectionInfo info)
throws Exception { throws Exception {
if (isLogAll() || isLogConnectionEvents()) { if (isLogAll() || isLogConnectionEvents()) {
@ -179,6 +181,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.addConnection(context, info); super.addConnection(context, info);
} }
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) public Subscription addConsumer(ConnectionContext context, ConsumerInfo info)
throws Exception { throws Exception {
if (isLogAll() || isLogConsumerEvents()) { if (isLogAll() || isLogConsumerEvents()) {
@ -187,6 +190,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
return super.addConsumer(context, info); return super.addConsumer(context, info);
} }
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception { throws Exception {
if (isLogAll() || isLogProducerEvents()) { if (isLogAll() || isLogProducerEvents()) {
@ -195,6 +199,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.addProducer(context, info); super.addProducer(context, info);
} }
@Override
public void commitTransaction(ConnectionContext context, TransactionId xid, public void commitTransaction(ConnectionContext context, TransactionId xid,
boolean onePhase) throws Exception { boolean onePhase) throws Exception {
if (isLogAll() || isLogTransactionEvents()) { if (isLogAll() || isLogTransactionEvents()) {
@ -203,6 +208,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.commitTransaction(context, xid, onePhase); super.commitTransaction(context, xid, onePhase);
} }
@Override
public void removeSubscription(ConnectionContext context, public void removeSubscription(ConnectionContext context,
RemoveSubscriptionInfo info) throws Exception { RemoveSubscriptionInfo info) throws Exception {
if (isLogAll() || isLogConsumerEvents()) { if (isLogAll() || isLogConsumerEvents()) {
@ -211,6 +217,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.removeSubscription(context, info); super.removeSubscription(context, info);
} }
@Override
public TransactionId[] getPreparedTransactions(ConnectionContext context) public TransactionId[] getPreparedTransactions(ConnectionContext context)
throws Exception { throws Exception {
@ -228,6 +235,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
return result; return result;
} }
@Override
public int prepareTransaction(ConnectionContext context, TransactionId xid) public int prepareTransaction(ConnectionContext context, TransactionId xid)
throws Exception { throws Exception {
if (isLogAll() || isLogTransactionEvents()) { if (isLogAll() || isLogTransactionEvents()) {
@ -236,6 +244,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
return super.prepareTransaction(context, xid); return super.prepareTransaction(context, xid);
} }
@Override
public void removeConnection(ConnectionContext context, public void removeConnection(ConnectionContext context,
ConnectionInfo info, Throwable error) throws Exception { ConnectionInfo info, Throwable error) throws Exception {
if (isLogAll() || isLogConnectionEvents()) { if (isLogAll() || isLogConnectionEvents()) {
@ -244,6 +253,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.removeConnection(context, info, error); super.removeConnection(context, info, error);
} }
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) public void removeConsumer(ConnectionContext context, ConsumerInfo info)
throws Exception { throws Exception {
if (isLogAll() || isLogConsumerEvents()) { if (isLogAll() || isLogConsumerEvents()) {
@ -252,6 +262,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.removeConsumer(context, info); super.removeConsumer(context, info);
} }
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) public void removeProducer(ConnectionContext context, ProducerInfo info)
throws Exception { throws Exception {
if (isLogAll() || isLogProducerEvents()) { if (isLogAll() || isLogProducerEvents()) {
@ -260,6 +271,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.removeProducer(context, info); super.removeProducer(context, info);
} }
@Override
public void rollbackTransaction(ConnectionContext context, TransactionId xid) public void rollbackTransaction(ConnectionContext context, TransactionId xid)
throws Exception { throws Exception {
if (isLogAll() || isLogTransactionEvents()) { if (isLogAll() || isLogTransactionEvents()) {
@ -268,6 +280,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.rollbackTransaction(context, xid); super.rollbackTransaction(context, xid);
} }
@Override
public void send(ProducerBrokerExchange producerExchange, public void send(ProducerBrokerExchange producerExchange,
Message messageSend) throws Exception { Message messageSend) throws Exception {
if (isLogAll() || isLogProducerEvents()) { if (isLogAll() || isLogProducerEvents()) {
@ -276,6 +289,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.send(producerExchange, messageSend); super.send(producerExchange, messageSend);
} }
@Override
public void beginTransaction(ConnectionContext context, TransactionId xid) public void beginTransaction(ConnectionContext context, TransactionId xid)
throws Exception { throws Exception {
if (isLogAll() || isLogTransactionEvents()) { if (isLogAll() || isLogTransactionEvents()) {
@ -284,6 +298,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.beginTransaction(context, xid); super.beginTransaction(context, xid);
} }
@Override
public void forgetTransaction(ConnectionContext context, public void forgetTransaction(ConnectionContext context,
TransactionId transactionId) throws Exception { TransactionId transactionId) throws Exception {
if (isLogAll() || isLogTransactionEvents()) { if (isLogAll() || isLogTransactionEvents()) {
@ -293,6 +308,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.forgetTransaction(context, transactionId); super.forgetTransaction(context, transactionId);
} }
@Override
public Connection[] getClients() throws Exception { public Connection[] getClients() throws Exception {
Connection[] result = super.getClients(); Connection[] result = super.getClients();
@ -311,17 +327,19 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
return super.getClients(); return super.getClients();
} }
@Override
public org.apache.activemq.broker.region.Destination addDestination( public org.apache.activemq.broker.region.Destination addDestination(
ConnectionContext context, ActiveMQDestination destination) ConnectionContext context, ActiveMQDestination destination,boolean create)
throws Exception { throws Exception {
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
LOG.info("Adding destination : " LOG.info("Adding destination : "
+ destination.getDestinationTypeAsString() + ":" + destination.getDestinationTypeAsString() + ":"
+ destination.getPhysicalName()); + destination.getPhysicalName());
} }
return super.addDestination(context, destination); return super.addDestination(context, destination,create);
} }
@Override
public void removeDestination(ConnectionContext context, public void removeDestination(ConnectionContext context,
ActiveMQDestination destination, long timeout) throws Exception { ActiveMQDestination destination, long timeout) throws Exception {
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
@ -332,6 +350,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.removeDestination(context, destination, timeout); super.removeDestination(context, destination, timeout);
} }
@Override
public ActiveMQDestination[] getDestinations() throws Exception { public ActiveMQDestination[] getDestinations() throws Exception {
ActiveMQDestination[] result = super.getDestinations(); ActiveMQDestination[] result = super.getDestinations();
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
@ -349,6 +368,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
return result; return result;
} }
@Override
public void start() throws Exception { public void start() throws Exception {
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
LOG.info("Starting " + getBrokerName()); LOG.info("Starting " + getBrokerName());
@ -356,6 +376,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.start(); super.start();
} }
@Override
public void stop() throws Exception { public void stop() throws Exception {
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
LOG.info("Stopping " + getBrokerName()); LOG.info("Stopping " + getBrokerName());
@ -363,6 +384,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.stop(); super.stop();
} }
@Override
public void addSession(ConnectionContext context, SessionInfo info) public void addSession(ConnectionContext context, SessionInfo info)
throws Exception { throws Exception {
if (isLogAll() || isLogConnectionEvents()) { if (isLogAll() || isLogConnectionEvents()) {
@ -371,6 +393,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.addSession(context, info); super.addSession(context, info);
} }
@Override
public void removeSession(ConnectionContext context, SessionInfo info) public void removeSession(ConnectionContext context, SessionInfo info)
throws Exception { throws Exception {
if (isLogAll() || isLogConnectionEvents()) { if (isLogAll() || isLogConnectionEvents()) {
@ -379,6 +402,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.removeSession(context, info); super.removeSession(context, info);
} }
@Override
public void addBroker(Connection connection, BrokerInfo info) { public void addBroker(Connection connection, BrokerInfo info) {
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
LOG.info("Adding Broker " + info.getBrokerName()); LOG.info("Adding Broker " + info.getBrokerName());
@ -386,6 +410,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.addBroker(connection, info); super.addBroker(connection, info);
} }
@Override
public void removeBroker(Connection connection, BrokerInfo info) { public void removeBroker(Connection connection, BrokerInfo info) {
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
LOG.info("Removing Broker " + info.getBrokerName()); LOG.info("Removing Broker " + info.getBrokerName());
@ -393,6 +418,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.removeBroker(connection, info); super.removeBroker(connection, info);
} }
@Override
public BrokerInfo[] getPeerBrokerInfos() { public BrokerInfo[] getPeerBrokerInfos() {
BrokerInfo[] result = super.getPeerBrokerInfos(); BrokerInfo[] result = super.getPeerBrokerInfos();
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
@ -410,6 +436,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
return result; return result;
} }
@Override
public void preProcessDispatch(MessageDispatch messageDispatch) { public void preProcessDispatch(MessageDispatch messageDispatch) {
if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
LOG.info("preProcessDispatch :" + messageDispatch); LOG.info("preProcessDispatch :" + messageDispatch);
@ -417,6 +444,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.preProcessDispatch(messageDispatch); super.preProcessDispatch(messageDispatch);
} }
@Override
public void postProcessDispatch(MessageDispatch messageDispatch) { public void postProcessDispatch(MessageDispatch messageDispatch) {
if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
LOG.info("postProcessDispatch :" + messageDispatch); LOG.info("postProcessDispatch :" + messageDispatch);
@ -424,6 +452,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.postProcessDispatch(messageDispatch); super.postProcessDispatch(messageDispatch);
} }
@Override
public void processDispatchNotification( public void processDispatchNotification(
MessageDispatchNotification messageDispatchNotification) MessageDispatchNotification messageDispatchNotification)
throws Exception { throws Exception {
@ -434,6 +463,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.processDispatchNotification(messageDispatchNotification); super.processDispatchNotification(messageDispatchNotification);
} }
@Override
public Set<ActiveMQDestination> getDurableDestinations() { public Set<ActiveMQDestination> getDurableDestinations() {
Set<ActiveMQDestination> result = super.getDurableDestinations(); Set<ActiveMQDestination> result = super.getDurableDestinations();
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
@ -451,6 +481,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
return result; return result;
} }
@Override
public void addDestinationInfo(ConnectionContext context, public void addDestinationInfo(ConnectionContext context,
DestinationInfo info) throws Exception { DestinationInfo info) throws Exception {
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
@ -459,6 +490,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.addDestinationInfo(context, info); super.addDestinationInfo(context, info);
} }
@Override
public void removeDestinationInfo(ConnectionContext context, public void removeDestinationInfo(ConnectionContext context,
DestinationInfo info) throws Exception { DestinationInfo info) throws Exception {
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
@ -467,6 +499,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.removeDestinationInfo(context, info); super.removeDestinationInfo(context, info);
} }
@Override
public void messageExpired(ConnectionContext context, public void messageExpired(ConnectionContext context,
MessageReference message) { MessageReference message) {
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
@ -480,6 +513,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.messageExpired(context, message); super.messageExpired(context, message);
} }
@Override
public void sendToDeadLetterQueue(ConnectionContext context, public void sendToDeadLetterQueue(ConnectionContext context,
MessageReference messageReference) { MessageReference messageReference) {
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
@ -492,6 +526,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
} }
} }
@Override
public void fastProducer(ConnectionContext context, public void fastProducer(ConnectionContext context,
ProducerInfo producerInfo) { ProducerInfo producerInfo) {
if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
@ -500,6 +535,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.fastProducer(context, producerInfo); super.fastProducer(context, producerInfo);
} }
@Override
public void isFull(ConnectionContext context, Destination destination, public void isFull(ConnectionContext context, Destination destination,
Usage usage) { Usage usage) {
if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
@ -508,6 +544,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.isFull(context, destination, usage); super.isFull(context, destination, usage);
} }
@Override
public void messageConsumed(ConnectionContext context, public void messageConsumed(ConnectionContext context,
MessageReference messageReference) { MessageReference messageReference) {
if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
@ -521,6 +558,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.messageConsumed(context, messageReference); super.messageConsumed(context, messageReference);
} }
@Override
public void messageDelivered(ConnectionContext context, public void messageDelivered(ConnectionContext context,
MessageReference messageReference) { MessageReference messageReference) {
if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
@ -534,6 +572,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.messageDelivered(context, messageReference); super.messageDelivered(context, messageReference);
} }
@Override
public void messageDiscarded(ConnectionContext context, public void messageDiscarded(ConnectionContext context,
MessageReference messageReference) { MessageReference messageReference) {
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
@ -547,6 +586,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.messageDiscarded(context, messageReference); super.messageDiscarded(context, messageReference);
} }
@Override
public void slowConsumer(ConnectionContext context, public void slowConsumer(ConnectionContext context,
Destination destination, Subscription subs) { Destination destination, Subscription subs) {
if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
@ -561,6 +601,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.slowConsumer(context, destination, subs); super.slowConsumer(context, destination, subs);
} }
@Override
public void nowMasterBroker() { public void nowMasterBroker() {
if (isLogAll() || isLogInternalEvents()) { if (isLogAll() || isLogInternalEvents()) {
LOG.info("Is now the master broker : " + getBrokerName()); LOG.info("Is now the master broker : " + getBrokerName());
@ -568,6 +609,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport implements
super.nowMasterBroker(); super.nowMasterBroker();
} }
@Override
public String toString() { public String toString() {
StringBuffer buf = new StringBuffer(); StringBuffer buf = new StringBuffer();
buf.append("LoggingBrokerPlugin("); buf.append("LoggingBrokerPlugin(");

View File

@ -19,7 +19,6 @@ package org.apache.activemq.broker.view;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
@ -38,17 +37,20 @@ public class DestinationDotFileInterceptor extends DotFileInterceptorSupport {
super(next, file); super(next, file);
} }
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { @Override
Destination answer = super.addDestination(context, destination); public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
Destination answer = super.addDestination(context, destination,create);
generateFile(); generateFile();
return answer; return answer;
} }
@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
super.removeDestination(context, destination, timeout); super.removeDestination(context, destination, timeout);
generateFile(); generateFile();
} }
@Override
protected void generateFile(PrintWriter writer) throws Exception { protected void generateFile(PrintWriter writer) throws Exception {
ActiveMQDestination[] destinations = getDestinations(); ActiveMQDestination[] destinations = getDestinations();

View File

@ -17,7 +17,6 @@
package org.apache.activemq.security; package org.apache.activemq.security;
import java.util.Set; import java.util.Set;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -47,20 +46,22 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
this.authorizationMap = authorizationMap; this.authorizationMap = authorizationMap;
} }
@Override
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
addDestination(context, info.getDestination()); addDestination(context, info.getDestination(),true);
super.addDestinationInfo(context, info); super.addDestinationInfo(context, info);
} }
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { @Override
final SecurityContext securityContext = (SecurityContext)context.getSecurityContext(); public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
final SecurityContext securityContext = context.getSecurityContext();
if (securityContext == null) { if (securityContext == null) {
throw new SecurityException("User is not authenticated."); throw new SecurityException("User is not authenticated.");
} }
Destination existing = this.getDestinationMap().get(destination); Destination existing = this.getDestinationMap().get(destination);
if (existing != null) { if (existing != null) {
return super.addDestination(context, destination); return super.addDestination(context, destination,create);
} }
if (!securityContext.isBrokerContext()) { if (!securityContext.isBrokerContext()) {
@ -77,12 +78,13 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
} }
return super.addDestination(context, destination); return super.addDestination(context, destination,create);
} }
@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
final SecurityContext securityContext = (SecurityContext)context.getSecurityContext(); final SecurityContext securityContext = context.getSecurityContext();
if (securityContext == null) { if (securityContext == null) {
throw new SecurityException("User is not authenticated."); throw new SecurityException("User is not authenticated.");
} }
@ -99,9 +101,10 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
super.removeDestination(context, destination, timeout); super.removeDestination(context, destination, timeout);
} }
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
final SecurityContext subject = (SecurityContext)context.getSecurityContext(); final SecurityContext subject = context.getSecurityContext();
if (subject == null) { if (subject == null) {
throw new SecurityException("User is not authenticated."); throw new SecurityException("User is not authenticated.");
} }
@ -141,9 +144,10 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
return super.addConsumer(context, info); return super.addConsumer(context, info);
} }
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
SecurityContext subject = (SecurityContext)context.getSecurityContext(); SecurityContext subject = context.getSecurityContext();
if (subject == null) { if (subject == null) {
throw new SecurityException("User is not authenticated."); throw new SecurityException("User is not authenticated.");
} }
@ -164,8 +168,9 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
super.addProducer(context, info); super.addProducer(context, info);
} }
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
SecurityContext subject = (SecurityContext)producerExchange.getConnectionContext().getSecurityContext(); SecurityContext subject = producerExchange.getConnectionContext().getSecurityContext();
if (subject == null) { if (subject == null) {
throw new SecurityException("User is not authenticated."); throw new SecurityException("User is not authenticated.");
} }

View File

@ -17,7 +17,6 @@
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import junit.framework.Test; import junit.framework.Test;
import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -28,6 +27,7 @@ public class DestinationRemoveRestartTest extends CombinationTestSupport {
public byte destinationType; public byte destinationType;
BrokerService broker; BrokerService broker;
@Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
broker = createBroker(); broker = createBroker();
} }
@ -40,6 +40,7 @@ public class DestinationRemoveRestartTest extends CombinationTestSupport {
return broker; return broker;
} }
@Override
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
broker.stop(); broker.stop();
} }
@ -63,7 +64,7 @@ public class DestinationRemoveRestartTest extends CombinationTestSupport {
ActiveMQDestination amqDestination = ActiveMQDestination amqDestination =
ActiveMQDestination.createDestination(destinationName, destinationType); ActiveMQDestination.createDestination(destinationName, destinationType);
broker.getRegionBroker().addDestination(broker.getAdminConnectionContext(), (ActiveMQDestination) amqDestination); broker.getRegionBroker().addDestination(broker.getAdminConnectionContext(), amqDestination,true);
final ActiveMQDestination[] list = broker.getRegionBroker().getDestinations(); final ActiveMQDestination[] list = broker.getRegionBroker().getDestinations();
for (final ActiveMQDestination element : list) { for (final ActiveMQDestination element : list) {

View File

@ -56,7 +56,7 @@ public class AMQ2571Test extends EmbeddedBrokerTestSupport {
Thread sendingThread = new Thread(new Runnable() { Thread sendingThread = new Thread(new Runnable() {
public void run() { public void run() {
try { try {
for (int i = 0; i < 5000; i++) { for (int i = 0; i < 100000; i++) {
producerB.send(message); producerB.send(message);
} }
} catch (JMSException e) { } catch (JMSException e) {
@ -76,7 +76,7 @@ public class AMQ2571Test extends EmbeddedBrokerTestSupport {
// Sleep for a while to make sure that we should know that the // Sleep for a while to make sure that we should know that the
// TempQueue is gone. // TempQueue is gone.
Thread.sleep(5000); //Thread.sleep(50);
// Now we test if we are able to send again. // Now we test if we are able to send again.
try { try {

View File

@ -40,7 +40,7 @@ public class SimpleQueueTest extends SimpleTopicTest {
@Override @Override
protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
PerfConsumer consumer = new PerfConsumer(fac, dest); PerfConsumer consumer = new PerfConsumer(fac, dest);
consumer.setInitialDelay(10000); //consumer.setInitialDelay(10000);
//consumer.setSleepDuration(10); //consumer.setSleepDuration(10);
boolean enableAudit = numberOfConsumers <= 1; boolean enableAudit = numberOfConsumers <= 1;
System.err.println("Enable Audit = " + enableAudit); System.err.println("Enable Audit = " + enableAudit);

View File

@ -17,9 +17,7 @@
package org.apache.activemq.xbean; package org.apache.activemq.xbean;
import java.net.URI; import java.net.URI;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -54,7 +52,7 @@ public class XBeanConfigTest extends TestCase {
// Validate the system properties are being evaluated in xbean. // Validate the system properties are being evaluated in xbean.
assertEquals("testbroker", brokerService.getBrokerName()); assertEquals("testbroker", brokerService.getBrokerName());
Topic topic = (Topic)broker.addDestination(context, new ActiveMQTopic("FOO.BAR")); Topic topic = (Topic)broker.addDestination(context, new ActiveMQTopic("FOO.BAR"),true);
DispatchPolicy dispatchPolicy = topic.getDispatchPolicy(); DispatchPolicy dispatchPolicy = topic.getDispatchPolicy();
assertTrue("dispatchPolicy should be RoundRobinDispatchPolicy: " + dispatchPolicy, dispatchPolicy instanceof RoundRobinDispatchPolicy); assertTrue("dispatchPolicy should be RoundRobinDispatchPolicy: " + dispatchPolicy, dispatchPolicy instanceof RoundRobinDispatchPolicy);
@ -66,7 +64,7 @@ public class XBeanConfigTest extends TestCase {
LOG.info("dispatchPolicy: " + dispatchPolicy); LOG.info("dispatchPolicy: " + dispatchPolicy);
LOG.info("subscriptionRecoveryPolicy: " + subscriptionRecoveryPolicy); LOG.info("subscriptionRecoveryPolicy: " + subscriptionRecoveryPolicy);
topic = (Topic)broker.addDestination(context, new ActiveMQTopic("ORDERS.BOOKS")); topic = (Topic)broker.addDestination(context, new ActiveMQTopic("ORDERS.BOOKS"),true);
dispatchPolicy = topic.getDispatchPolicy(); dispatchPolicy = topic.getDispatchPolicy();
assertTrue("dispatchPolicy should be StrictOrderDispatchPolicy: " + dispatchPolicy, dispatchPolicy instanceof StrictOrderDispatchPolicy); assertTrue("dispatchPolicy should be StrictOrderDispatchPolicy: " + dispatchPolicy, dispatchPolicy instanceof StrictOrderDispatchPolicy);
@ -81,6 +79,7 @@ public class XBeanConfigTest extends TestCase {
LOG.info("subscriptionRecoveryPolicy: " + subscriptionRecoveryPolicy); LOG.info("subscriptionRecoveryPolicy: " + subscriptionRecoveryPolicy);
} }
@Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
System.setProperty("brokername", "testbroker"); System.setProperty("brokername", "testbroker");
brokerService = createBroker(); brokerService = createBroker();
@ -106,6 +105,7 @@ public class XBeanConfigTest extends TestCase {
assertNotNull("No broker created!"); assertNotNull("No broker created!");
} }
@Override
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
if (brokerService != null) { if (brokerService != null) {
brokerService.stop(); brokerService.stop();