More checkstyle fixes

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@564057 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-08-09 00:34:07 +00:00
parent f812e34179
commit 74a7a8bbfc
724 changed files with 3525 additions and 4208 deletions

View File

@ -41,7 +41,6 @@ import javax.jms.DeliveryMode;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.ExceptionListener; import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException; import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.QueueConnection; import javax.jms.QueueConnection;
@ -290,8 +289,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
checkClosedOrFailed(); checkClosedOrFailed();
ensureConnectionInfoSent(); ensureConnectionInfoSent();
boolean doSessionAsync = alwaysSessionAsync || sessions.size() > 0 || transacted || acknowledgeMode == Session.CLIENT_ACKNOWLEDGE; boolean doSessionAsync = alwaysSessionAsync || sessions.size() > 0 || transacted || acknowledgeMode == Session.CLIENT_ACKNOWLEDGE;
return new ActiveMQSession(this, getNextSessionId(), (transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
? Session.AUTO_ACKNOWLEDGE : acknowledgeMode)), dispatchAsync, alwaysSessionAsync); ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), dispatchAsync, alwaysSessionAsync);
} }
/** /**
@ -1168,10 +1167,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
Response response = (Response)this.transport.request(command); Response response = (Response)this.transport.request(command);
if (response.isException()) { if (response.isException()) {
ExceptionResponse er = (ExceptionResponse)response; ExceptionResponse er = (ExceptionResponse)response;
if (er.getException() instanceof JMSException) if (er.getException() instanceof JMSException) {
throw (JMSException)er.getException(); throw (JMSException)er.getException();
else } else {
throw JMSExceptionSupport.create(er.getException()); throw JMSExceptionSupport.create(er.getException());
}
} }
return response; return response;
} catch (IOException e) { } catch (IOException e) {
@ -1196,10 +1196,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
Response response = (Response)this.transport.request(command, timeout); Response response = (Response)this.transport.request(command, timeout);
if (response != null && response.isException()) { if (response != null && response.isException()) {
ExceptionResponse er = (ExceptionResponse)response; ExceptionResponse er = (ExceptionResponse)response;
if (er.getException() instanceof JMSException) if (er.getException() instanceof JMSException) {
throw (JMSException)er.getException(); throw (JMSException)er.getException();
else } else {
throw JMSExceptionSupport.create(er.getException()); throw JMSExceptionSupport.create(er.getException());
}
} }
return response; return response;
} catch (IOException e) { } catch (IOException e) {
@ -1361,9 +1362,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @throws IllegalStateException if the connection is in used. * @throws IllegalStateException if the connection is in used.
*/ */
public void changeUserInfo(String userName, String password) throws JMSException { public void changeUserInfo(String userName, String password) throws JMSException {
if (isConnectionInfoSentToBroker) if (isConnectionInfoSentToBroker) {
throw new IllegalStateException("changeUserInfo used Connection is not allowed"); throw new IllegalStateException("changeUserInfo used Connection is not allowed");
}
this.info.setUserName(userName); this.info.setUserName(userName);
this.info.setPassword(password); this.info.setPassword(password);
} }
@ -1374,8 +1375,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
*/ */
public String getResourceManagerId() throws JMSException { public String getResourceManagerId() throws JMSException {
waitForBrokerInfo(); waitForBrokerInfo();
if (brokerInfo == null) if (brokerInfo == null) {
throw new JMSException("Connection failed before Broker info was received."); throw new JMSException("Connection failed before Broker info was received.");
}
return brokerInfo.getBrokerId().getValue(); return brokerInfo.getBrokerId().getValue();
} }
@ -1580,8 +1582,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
onAsyncException(error.getException()); onAsyncException(error.getException());
} }
}); });
new Thread("Async error worker") {
}.start();
return null; return null;
} }
@ -1633,8 +1633,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
if (!closed.get() && !closing.get()) { if (!closed.get() && !closing.get()) {
if (this.exceptionListener != null) { if (this.exceptionListener != null) {
if (!(error instanceof JMSException)) if (!(error instanceof JMSException)) {
error = JMSExceptionSupport.create(error); error = JMSExceptionSupport.create(error);
}
final JMSException e = (JMSException)error; final JMSException e = (JMSException)error;
asyncConnectionThread.execute(new Runnable() { asyncConnectionThread.execute(new Runnable() {
@ -1744,8 +1745,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// If we are not watching the advisories.. then // If we are not watching the advisories.. then
// we will assume that the temp destination does exist. // we will assume that the temp destination does exist.
if (advisoryConsumer == null) if (advisoryConsumer == null) {
return false; return false;
}
return !activeTempDestinations.contains(dest); return !activeTempDestinations.contains(dest);
} }

View File

@ -16,6 +16,23 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.jndi.JNDIBaseStorable; import org.apache.activemq.jndi.JNDIBaseStorable;
import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.management.JMSStatsImpl;
@ -29,22 +46,6 @@ import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.URISupport.CompositeData; import org.apache.activemq.util.URISupport.CompositeData;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
/** /**
* A ConnectionFactory is an an Administered object, and is used for creating * A ConnectionFactory is an an Administered object, and is used for creating
* Connections. <p/> This class also implements QueueConnectionFactory and * Connections. <p/> This class also implements QueueConnectionFactory and

View File

@ -28,7 +28,7 @@ import javax.jms.ConnectionMetaData;
* the <CODE>Connection</CODE> object. * the <CODE>Connection</CODE> object.
*/ */
public class ActiveMQConnectionMetaData implements ConnectionMetaData { public final class ActiveMQConnectionMetaData implements ConnectionMetaData {
public static final String PROVIDER_VERSION; public static final String PROVIDER_VERSION;
public static final int PROVIDER_MAJOR_VERSION; public static final int PROVIDER_MAJOR_VERSION;

View File

@ -16,7 +16,29 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import org.apache.activemq.command.*; import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.management.JMSConsumerStatsImpl; import org.apache.activemq.management.JMSConsumerStatsImpl;
import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl; import org.apache.activemq.management.StatsImpl;
@ -29,18 +51,6 @@ import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import javax.jms.IllegalStateException;
import javax.jms.*;
import javax.jms.Message;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* A client uses a <CODE>MessageConsumer</CODE> object to receive messages * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
* from a destination. A <CODE> MessageConsumer</CODE> object is created by * from a destination. A <CODE> MessageConsumer</CODE> object is created by
@ -612,7 +622,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
if ((session.isTransacted() || session.isDupsOkAcknowledge())) { if (session.isTransacted() || session.isDupsOkAcknowledge()) {
acknowledge(); acknowledge();
} }
if (session.isClientAcknowledge()) { if (session.isClientAcknowledge()) {
@ -885,7 +895,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
} }
if (!unconsumedMessages.isClosed()) { if (!unconsumedMessages.isClosed()) {
if (this.info.isBrowser() || session.connection.isDuplicate(this, md.getMessage()) == false) { if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) { if (listener != null && unconsumedMessages.isRunning()) {
ActiveMQMessage message = createActiveMQMessage(md); ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md); beforeMessageIsConsumed(md);

View File

@ -16,6 +16,15 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
@ -26,14 +35,6 @@ import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* A client uses a <CODE>MessageProducer</CODE> object to send messages to a * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
* destination. A <CODE>MessageProducer</CODE> object is created by passing a * destination. A <CODE>MessageProducer</CODE> object is created by passing a
@ -139,14 +140,14 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
* to some internal error. * to some internal error.
*/ */
public void close() throws JMSException { public void close() throws JMSException {
if (closed == false) { if (!closed) {
dispose(); dispose();
this.session.asyncSendPacket(info.createRemoveCommand()); this.session.asyncSendPacket(info.createRemoveCommand());
} }
} }
public void dispose() { public void dispose() {
if (closed == false) { if (!closed) {
this.session.removeProducer(this); this.session.removeProducer(this);
closed = true; closed = true;
} }

View File

@ -16,9 +16,12 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import javax.jms.*; import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.IllegalStateException; import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageProducer;
/** /**
* A useful base class for implementing a {@link MessageProducer} * A useful base class for implementing a {@link MessageProducer}
@ -194,7 +197,7 @@ public abstract class ActiveMQMessageProducerSupport implements MessageProducer,
* @see javax.jms.Message#DEFAULT_TIME_TO_LIVE * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
*/ */
public void setTimeToLive(long timeToLive) throws JMSException { public void setTimeToLive(long timeToLive) throws JMSException {
if (timeToLive < 0l) { if (timeToLive < 0L) {
throw new IllegalStateException("cannot set a negative timeToLive"); throw new IllegalStateException("cannot set a negative timeToLive");
} }
checkClosed(); checkClosed();

View File

@ -20,10 +20,10 @@ import java.util.Enumeration;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.MessageEOFException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MapMessage; import javax.jms.MapMessage;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageEOFException;
import javax.jms.ObjectMessage; import javax.jms.ObjectMessage;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.StreamMessage; import javax.jms.StreamMessage;

View File

@ -72,7 +72,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable {
} }
public void close() throws IOException { public void close() throws IOException {
if (closed == false) { if (!closed) {
flushBuffer(); flushBuffer();
try { try {
// Send an EOS style empty message to signal EOS. // Send an EOS style empty message to signal EOS.
@ -86,7 +86,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable {
} }
public void dispose() { public void dispose() {
if (closed == false) { if (!closed) {
this.connection.removeOutputStream(this); this.connection.removeOutputStream(this);
closed = true; closed = true;
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq; package org.apache.activemq;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.IllegalStateException; import javax.jms.IllegalStateException;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -28,8 +29,6 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
* queue without removing them. <p/> * queue without removing them. <p/>

View File

@ -29,8 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.IllegalStateException; import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MapMessage; import javax.jms.MapMessage;
import javax.jms.Message; import javax.jms.Message;
@ -52,11 +50,32 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher; import javax.jms.TopicPublisher;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import javax.jms.TransactionRolledBackException;
import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.blob.BlobUploader; import org.apache.activemq.blob.BlobUploader;
import org.apache.activemq.command.*; import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.management.JMSSessionStatsImpl; import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl; import org.apache.activemq.management.StatsImpl;

View File

@ -17,17 +17,12 @@
package org.apache.activemq; package org.apache.activemq;
import org.apache.activemq.command.ActiveMQDestination;
import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageFormatException;
import javax.jms.Session;
import javax.jms.Topic; import javax.jms.Topic;
import javax.jms.TopicPublisher; import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.command.ActiveMQDestination;
/** /**
* A client uses a <CODE>TopicPublisher</CODE> object to publish messages on * A client uses a <CODE>TopicPublisher</CODE> object to publish messages on

View File

@ -17,13 +17,13 @@
package org.apache.activemq; package org.apache.activemq;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Topic; import javax.jms.Topic;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
/** /**
* A client uses a <CODE>TopicSubscriber</CODE> object to receive messages * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
* that have been published to a topic. A <CODE>TopicSubscriber</CODE> object * that have been published to a topic. A <CODE>TopicSubscriber</CODE> object

View File

@ -16,8 +16,7 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import org.apache.activemq.management.JMSStatsImpl; import java.net.URI;
import org.apache.activemq.transport.Transport;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.XAConnection; import javax.jms.XAConnection;
@ -27,7 +26,8 @@ import javax.jms.XAQueueConnectionFactory;
import javax.jms.XATopicConnection; import javax.jms.XATopicConnection;
import javax.jms.XATopicConnectionFactory; import javax.jms.XATopicConnectionFactory;
import java.net.URI; import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.transport.Transport;
/** /**
* A factory of {@link XAConnection} instances * A factory of {@link XAConnection} instances

View File

@ -16,11 +16,12 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import javax.jms.JMSException;
import java.net.URL;
import java.net.MalformedURLException;
import java.io.InputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import javax.jms.JMSException;
/** /**
* Represents a message which has a typically out of band Binary Large Object * Represents a message which has a typically out of band Binary Large Object

View File

@ -17,13 +17,13 @@
package org.apache.activemq; package org.apache.activemq;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.TopicSubscriber;
import javax.jms.QueueReceiver; import javax.jms.QueueReceiver;
import javax.jms.TopicPublisher;
import javax.jms.QueueSender; import javax.jms.QueueSender;
import javax.jms.JMSException; import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
/** /**
* Represents a hook to allow the support of custom destinations * Represents a hook to allow the support of custom destinations

View File

@ -16,13 +16,11 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import org.apache.activemq.command.ActiveMQMessage;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.MessageProducer;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
/** /**
* A plugin strategy for transforming a message before it is sent by the JMS client or before it is * A plugin strategy for transforming a message before it is sent by the JMS client or before it is

View File

@ -19,6 +19,7 @@ package org.apache.activemq;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.TransactionInProgressException; import javax.jms.TransactionInProgressException;
@ -29,20 +30,18 @@ import javax.transaction.xa.Xid;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.DataArrayResponse; import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.IntegerResponse; import org.apache.activemq.command.IntegerResponse;
import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId; import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* A TransactionContext provides the means to control a JMS transaction. It * A TransactionContext provides the means to control a JMS transaction. It
* provides a local transaction interface and also an XAResource interface. <p/> * provides a local transaction interface and also an XAResource interface. <p/>

View File

@ -16,8 +16,8 @@
*/ */
package org.apache.activemq.advisory; package org.apache.activemq.advisory;
import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.BrokerFilter;
@ -42,8 +42,6 @@ import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* This broker filter handles tracking the state of the broker for purposes of * This broker filter handles tracking the state of the broker for purposes of
* publishing advisory messages to advisory consumers. * publishing advisory messages to advisory consumers.

View File

@ -16,11 +16,11 @@
*/ */
package org.apache.activemq.advisory; package org.apache.activemq.advisory;
import javax.jms.Destination;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.Destination;
public class AdvisorySupport { public class AdvisorySupport {
public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory."; public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
@ -102,8 +102,9 @@ public class AdvisorySupport {
return TEMP_QUEUE_ADVISORY_TOPIC; return TEMP_QUEUE_ADVISORY_TOPIC;
case ActiveMQDestination.TEMP_TOPIC_TYPE: case ActiveMQDestination.TEMP_TOPIC_TYPE:
return TEMP_TOPIC_ADVISORY_TOPIC; return TEMP_TOPIC_ADVISORY_TOPIC;
default:
throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
} }
throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
} }
public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) { public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {

View File

@ -16,11 +16,11 @@
*/ */
package org.apache.activemq.advisory; package org.apache.activemq.advisory;
import org.apache.activemq.command.ConsumerId; import java.util.EventObject;
import javax.jms.Destination; import javax.jms.Destination;
import java.util.EventObject; import org.apache.activemq.command.ConsumerId;
/** /**
* An event when the number of consumers on a given destination changes. * An event when the number of consumers on a given destination changes.

View File

@ -19,6 +19,14 @@ package org.apache.activemq.advisory;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
@ -29,14 +37,6 @@ import org.apache.activemq.command.RemoveInfo;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
/** /**
* An object which can be used to listen to the number of active consumers * An object which can be used to listen to the number of active consumers
* available on a given destination. * available on a given destination.
@ -82,27 +82,24 @@ public class ConsumerEventSource implements Service, MessageListener {
public void onMessage(Message message) { public void onMessage(Message message) {
if (message instanceof ActiveMQMessage) { if (message instanceof ActiveMQMessage) {
ActiveMQMessage activeMessage = (ActiveMQMessage) message; ActiveMQMessage activeMessage = (ActiveMQMessage)message;
Object command = activeMessage.getDataStructure(); Object command = activeMessage.getDataStructure();
int count = 0; int count = 0;
if (command instanceof ConsumerInfo) { if (command instanceof ConsumerInfo) {
count = consumerCount.incrementAndGet(); count = consumerCount.incrementAndGet();
count = extractConsumerCountFromMessage(message, count); count = extractConsumerCountFromMessage(message, count);
fireConsumerEvent(new ConsumerStartedEvent(this, destination, (ConsumerInfo) command, count)); fireConsumerEvent(new ConsumerStartedEvent(this, destination, (ConsumerInfo)command, count));
} } else if (command instanceof RemoveInfo) {
else if (command instanceof RemoveInfo) { RemoveInfo removeInfo = (RemoveInfo)command;
RemoveInfo removeInfo = (RemoveInfo) command;
if (removeInfo.isConsumerRemove()) { if (removeInfo.isConsumerRemove()) {
count = consumerCount.decrementAndGet(); count = consumerCount.decrementAndGet();
count = extractConsumerCountFromMessage(message, count); count = extractConsumerCountFromMessage(message, count);
fireConsumerEvent(new ConsumerStoppedEvent(this, destination, (ConsumerId) removeInfo.getObjectId(), count)); fireConsumerEvent(new ConsumerStoppedEvent(this, destination, (ConsumerId)removeInfo.getObjectId(), count));
} }
} } else {
else {
log.warn("Unknown command: " + command); log.warn("Unknown command: " + command);
} }
} } else {
else {
log.warn("Unknown message type: " + message + ". Message ignored"); log.warn("Unknown message type: " + message + ". Message ignored");
} }
} }
@ -116,12 +113,11 @@ public class ConsumerEventSource implements Service, MessageListener {
try { try {
Object value = message.getObjectProperty("consumerCount"); Object value = message.getObjectProperty("consumerCount");
if (value instanceof Number) { if (value instanceof Number) {
Number n = (Number) value; Number n = (Number)value;
return n.intValue(); return n.intValue();
} }
log.warn("No consumerCount header available on the message: " + message); log.warn("No consumerCount header available on the message: " + message);
} } catch (Exception e) {
catch (Exception e) {
log.warn("Failed to extract consumerCount from message: " + message + ".Reason: " + e, e); log.warn("Failed to extract consumerCount from message: " + message + ".Reason: " + e, e);
} }
return count; return count;

View File

@ -16,11 +16,11 @@
*/ */
package org.apache.activemq.advisory; package org.apache.activemq.advisory;
import org.apache.activemq.command.ProducerId; import java.util.EventObject;
import javax.jms.Destination; import javax.jms.Destination;
import java.util.EventObject; import org.apache.activemq.command.ProducerId;
/** /**
* An event when the number of producers on a given destination changes. * An event when the number of producers on a given destination changes.

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.advisory; package org.apache.activemq.advisory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -23,6 +26,7 @@ import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.Session; import javax.jms.Session;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
@ -32,8 +36,6 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.RemoveInfo;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* An object which can be used to listen to the number of active consumers * An object which can be used to listen to the number of active consumers
@ -80,27 +82,24 @@ public class ProducerEventSource implements Service, MessageListener {
public void onMessage(Message message) { public void onMessage(Message message) {
if (message instanceof ActiveMQMessage) { if (message instanceof ActiveMQMessage) {
ActiveMQMessage activeMessage = (ActiveMQMessage) message; ActiveMQMessage activeMessage = (ActiveMQMessage)message;
Object command = activeMessage.getDataStructure(); Object command = activeMessage.getDataStructure();
int count = 0; int count = 0;
if (command instanceof ProducerInfo) { if (command instanceof ProducerInfo) {
count = producerCount.incrementAndGet(); count = producerCount.incrementAndGet();
count = extractProducerCountFromMessage(message, count); count = extractProducerCountFromMessage(message, count);
fireProducerEvent(new ProducerStartedEvent(this, destination, (ProducerInfo) command, count)); fireProducerEvent(new ProducerStartedEvent(this, destination, (ProducerInfo)command, count));
} } else if (command instanceof RemoveInfo) {
else if (command instanceof RemoveInfo) { RemoveInfo removeInfo = (RemoveInfo)command;
RemoveInfo removeInfo = (RemoveInfo) command;
if (removeInfo.isProducerRemove()) { if (removeInfo.isProducerRemove()) {
count = producerCount.decrementAndGet(); count = producerCount.decrementAndGet();
count = extractProducerCountFromMessage(message, count); count = extractProducerCountFromMessage(message, count);
fireProducerEvent(new ProducerStoppedEvent(this, destination, (ProducerId) removeInfo.getObjectId(), count)); fireProducerEvent(new ProducerStoppedEvent(this, destination, (ProducerId)removeInfo.getObjectId(), count));
} }
} } else {
else {
log.warn("Unknown command: " + command); log.warn("Unknown command: " + command);
} }
} } else {
else {
log.warn("Unknown message type: " + message + ". Message ignored"); log.warn("Unknown message type: " + message + ". Message ignored");
} }
} }
@ -109,12 +108,11 @@ public class ProducerEventSource implements Service, MessageListener {
try { try {
Object value = message.getObjectProperty("producerCount"); Object value = message.getObjectProperty("producerCount");
if (value instanceof Number) { if (value instanceof Number) {
Number n = (Number) value; Number n = (Number)value;
return n.intValue(); return n.intValue();
} }
log.warn("No producerCount header available on the message: " + message); log.warn("No producerCount header available on the message: " + message);
} } catch (Exception e) {
catch (Exception e) {
log.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e); log.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e);
} }
return count; return count;

View File

@ -16,14 +16,15 @@
*/ */
package org.apache.activemq.blob; package org.apache.activemq.blob;
import org.apache.activemq.command.ActiveMQBlobMessage;
import javax.jms.JMSException;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URL; import java.net.URL;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBlobMessage;
/** /**
* Represents a strategy of uploading a file/stream to some remote * Represents a strategy of uploading a file/stream to some remote
* *

View File

@ -16,14 +16,15 @@
*/ */
package org.apache.activemq.blob; package org.apache.activemq.blob;
import org.apache.activemq.command.ActiveMQBlobMessage;
import javax.jms.JMSException;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URL; import java.net.URL;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBlobMessage;
/** /**
* A helper class to represent a required upload of a BLOB to some remote URL * A helper class to represent a required upload of a BLOB to some remote URL
* *
@ -35,7 +36,6 @@ public class BlobUploader {
private File file; private File file;
private InputStream in; private InputStream in;
public BlobUploader(BlobTransferPolicy blobTransferPolicy, InputStream in) { public BlobUploader(BlobTransferPolicy blobTransferPolicy, InputStream in) {
this.blobTransferPolicy = blobTransferPolicy; this.blobTransferPolicy = blobTransferPolicy;
this.in = in; this.in = in;
@ -49,13 +49,11 @@ public class BlobUploader {
public URL upload(ActiveMQBlobMessage message) throws JMSException, IOException { public URL upload(ActiveMQBlobMessage message) throws JMSException, IOException {
if (file != null) { if (file != null) {
return getStrategy().uploadFile(message, file); return getStrategy().uploadFile(message, file);
} } else {
else {
return getStrategy().uploadStream(message, in); return getStrategy().uploadStream(message, in);
} }
} }
public BlobTransferPolicy getBlobTransferPolicy() { public BlobTransferPolicy getBlobTransferPolicy() {
return blobTransferPolicy; return blobTransferPolicy;
} }

View File

@ -30,9 +30,11 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException; import javax.management.MalformedObjectNameException;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData; import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker; import org.apache.activemq.advisory.AdvisoryBroker;
@ -77,10 +79,10 @@ import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.vm.VMTransportFactory; import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.JMXSupport; import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.IOHelper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;

View File

@ -16,9 +16,9 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
@ -29,8 +29,6 @@ import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.security.SecurityContext; import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.transaction.Transaction; import org.apache.activemq.transaction.Transaction;
import java.io.IOException;
/** /**
* Used to hold context information needed to process requests sent to a broker. * Used to hold context information needed to process requests sent to a broker.
* *

View File

@ -16,10 +16,10 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import org.apache.activemq.command.ActiveMQDestination;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQDestination;
/** /**
* An exception thrown if a destination is attempted to be created when it already exists. * An exception thrown if a destination is attempted to be created when it already exists.
* *

View File

@ -16,18 +16,18 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import org.apache.activemq.util.IntrospectionSupport;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.net.MalformedURLException;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import org.apache.activemq.util.IntrospectionSupport;
/** /**
* A {@link BrokerFactoryHandler} which uses a properties file to configure the * A {@link BrokerFactoryHandler} which uses a properties file to configure the
* broker's various policies. * broker's various policies.

View File

@ -17,10 +17,6 @@
package org.apache.activemq.broker; package org.apache.activemq.broker;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.security.KeyManagementException; import java.security.KeyManagementException;
@ -29,6 +25,10 @@ import java.security.SecureRandom;
import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManager;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.SslTransportFactory;
/** /**
* A BrokerService that allows access to the key and trust managers used by SSL * A BrokerService that allows access to the key and trust managers used by SSL
* connections. There is no reason to use this class unless SSL is being used * connections. There is no reason to use this class unless SSL is being used

View File

@ -16,8 +16,15 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.LocalTransactionId;
@ -36,15 +43,6 @@ import org.apache.activemq.util.WrappedException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
/** /**
* This broker filter handles the transaction related operations in the Broker * This broker filter handles the transaction related operations in the Broker
* interface. * interface.

View File

@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.broker.ft.MasterBroker; import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics; import org.apache.activemq.broker.region.ConnectionStatistics;
@ -97,8 +98,7 @@ import org.apache.commons.logging.LogFactory;
public class TransportConnection implements Service, Connection, Task, CommandVisitor { public class TransportConnection implements Service, Connection, Task, CommandVisitor {
private static final Log LOG = LogFactory.getLog(TransportConnection.class); private static final Log LOG = LogFactory.getLog(TransportConnection.class);
private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName() private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName() + ".Transport");
+ ".Transport");
private static final Log SERVICELOG = LogFactory.getLog(TransportConnection.class.getName() + ".Service"); private static final Log SERVICELOG = LogFactory.getLog(TransportConnection.class.getName() + ".Service");
// Keeps track of the broker and connector that created this connection. // Keeps track of the broker and connector that created this connection.
protected final Broker broker; protected final Broker broker;
@ -191,8 +191,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
* @param taskRunnerFactory - can be null if you want direct dispatch to the * @param taskRunnerFactory - can be null if you want direct dispatch to the
* transport else commands are sent async. * transport else commands are sent async.
*/ */
public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, TaskRunnerFactory taskRunnerFactory) {
TaskRunnerFactory taskRunnerFactory) {
this.connector = connector; this.connector = connector;
this.broker = broker; this.broker = broker;
RegionBroker rb = (RegionBroker)broker.getAdaptor(RegionBroker.class); RegionBroker rb = (RegionBroker)broker.getAdaptor(RegionBroker.class);
@ -272,8 +271,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
else if (e.getClass() == BrokerStoppedException.class) { else if (e.getClass() == BrokerStoppedException.class) {
if (!disposed.get()) { if (!disposed.get()) {
if (SERVICELOG.isDebugEnabled()) if (SERVICELOG.isDebugEnabled())
SERVICELOG SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection.");
.debug("Broker has been stopped. Notifying client and closing his connection.");
ConnectionError ce = new ConnectionError(); ConnectionError ce = new ConnectionError();
ce.setException(e); ce.setException(e);
dispatchSync(ce); dispatchSync(ce);
@ -403,8 +401,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
} }
TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if (transactionState == null) if (transactionState == null)
throw new IllegalStateException("Cannot prepare a transaction that had not been started: " throw new IllegalStateException("Cannot prepare a transaction that had not been started: " + info.getTransactionId());
+ info.getTransactionId());
// Avoid dups. // Avoid dups.
if (!transactionState.isPrepared()) { if (!transactionState.isPrepared()) {
transactionState.setPrepared(true); transactionState.setPrepared(true);
@ -473,8 +470,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull); return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
} }
public Response processMessageDispatchNotification(MessageDispatchNotification notification) public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
throws Exception {
broker.processDispatchNotification(notification); broker.processDispatchNotification(notification);
return null; return null;
} }
@ -503,9 +499,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
TransportConnectionState cs = lookupConnectionState(connectionId); TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId); SessionState ss = cs.getSessionState(sessionId);
if (ss == null) if (ss == null)
throw new IllegalStateException( throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
"Cannot add a producer to a session that had not been registered: "
+ sessionId);
// Avoid replaying dup commands // Avoid replaying dup commands
if (!ss.getProducerIds().contains(info.getProducerId())) { if (!ss.getProducerIds().contains(info.getProducerId())) {
broker.addProducer(cs.getContext(), info); broker.addProducer(cs.getContext(), info);
@ -524,9 +518,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
TransportConnectionState cs = lookupConnectionState(connectionId); TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId); SessionState ss = cs.getSessionState(sessionId);
if (ss == null) if (ss == null)
throw new IllegalStateException( throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " + sessionId);
"Cannot remove a producer from a session that had not been registered: "
+ sessionId);
ProducerState ps = ss.removeProducer(id); ProducerState ps = ss.removeProducer(id);
if (ps == null) if (ps == null)
throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id); throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
@ -541,9 +533,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
TransportConnectionState cs = lookupConnectionState(connectionId); TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId); SessionState ss = cs.getSessionState(sessionId);
if (ss == null) if (ss == null)
throw new IllegalStateException( throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: " + sessionId);
"Cannot add a consumer to a session that had not been registered: "
+ sessionId);
// Avoid replaying dup commands // Avoid replaying dup commands
if (!ss.getConsumerIds().contains(info.getConsumerId())) { if (!ss.getConsumerIds().contains(info.getConsumerId())) {
broker.addConsumer(cs.getContext(), info); broker.addConsumer(cs.getContext(), info);
@ -562,9 +552,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
TransportConnectionState cs = lookupConnectionState(connectionId); TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId); SessionState ss = cs.getSessionState(sessionId);
if (ss == null) if (ss == null)
throw new IllegalStateException( throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + sessionId);
"Cannot remove a consumer from a session that had not been registered: "
+ sessionId);
ConsumerState consumerState = ss.removeConsumer(id); ConsumerState consumerState = ss.removeConsumer(id);
if (consumerState == null) if (consumerState == null)
throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id); throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
@ -641,8 +629,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
if (state.getConnection() != this) { if (state.getConnection() != this) {
LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress()); LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
state.getConnection().stop(); state.getConnection().stop();
LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: " LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: " + state.getConnection().getRemoteAddress());
+ state.getConnection().getRemoteAddress());
state.setConnection(this); state.setConnection(this);
state.reset(info); state.reset(info);
} }
@ -765,8 +752,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
} }
protected void processDispatch(Command command) throws IOException { protected void processDispatch(Command command) throws IOException {
final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch() final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch() ? command : null);
? command : null);
try { try {
if (!disposed.get()) { if (!disposed.get()) {
if (messageDispatch != null) { if (messageDispatch != null) {
@ -846,8 +832,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
transport.start(); transport.start();
if (taskRunnerFactory != null) { if (taskRunnerFactory != null) {
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " + getRemoteAddress());
+ getRemoteAddress());
} else { } else {
taskRunner = null; taskRunner = null;
} }
@ -1114,8 +1099,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
Transport localTransport = TransportFactory.connect(uri); Transport localTransport = TransportFactory.connect(uri);
Transport remoteBridgeTransport = new ResponseCorrelator(transport); Transport remoteBridgeTransport = new ResponseCorrelator(transport);
duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport);
remoteBridgeTransport);
// now turn duplex off this side // now turn duplex off this side
info.setDuplexConnection(false); info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true); duplexBridge.setCreatedByDuplex(true);
@ -1180,8 +1164,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
ProducerState producerState = ss.getProducerState(id); ProducerState producerState = ss.getProducerState(id);
if (producerState != null && producerState.getInfo() != null) { if (producerState != null && producerState.getInfo() != null) {
ProducerInfo info = producerState.getInfo(); ProducerInfo info = producerState.getInfo();
result.setMutable(info.getDestination() == null result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
|| info.getDestination().isComposite());
} }
} }
producerExchanges.put(id, result); producerExchanges.put(id, result);
@ -1285,8 +1268,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
// //
// ///////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////
protected TransportConnectionState registerConnectionState(ConnectionId connectionId, protected TransportConnectionState registerConnectionState(ConnectionId connectionId, TransportConnectionState state) {
TransportConnectionState state) {
TransportConnectionState rc = connectionState; TransportConnectionState rc = connectionState;
connectionState = state; connectionState = state;
return rc; return rc;
@ -1309,44 +1291,35 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
protected TransportConnectionState lookupConnectionState(String connectionId) { protected TransportConnectionState lookupConnectionState(String connectionId) {
TransportConnectionState cs = connectionState; TransportConnectionState cs = connectionState;
if (cs == null) if (cs == null)
throw new IllegalStateException( throw new IllegalStateException("Cannot lookup a connectionId for a connection that had not been registered: " + connectionId);
"Cannot lookup a connectionId for a connection that had not been registered: "
+ connectionId);
return cs; return cs;
} }
protected TransportConnectionState lookupConnectionState(ConsumerId id) { protected TransportConnectionState lookupConnectionState(ConsumerId id) {
TransportConnectionState cs = connectionState; TransportConnectionState cs = connectionState;
if (cs == null) if (cs == null)
throw new IllegalStateException( throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: " + id.getParentId().getParentId());
"Cannot lookup a consumer from a connection that had not been registered: "
+ id.getParentId().getParentId());
return cs; return cs;
} }
protected TransportConnectionState lookupConnectionState(ProducerId id) { protected TransportConnectionState lookupConnectionState(ProducerId id) {
TransportConnectionState cs = connectionState; TransportConnectionState cs = connectionState;
if (cs == null) if (cs == null)
throw new IllegalStateException( throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: " + id.getParentId().getParentId());
"Cannot lookup a producer from a connection that had not been registered: "
+ id.getParentId().getParentId());
return cs; return cs;
} }
protected TransportConnectionState lookupConnectionState(SessionId id) { protected TransportConnectionState lookupConnectionState(SessionId id) {
TransportConnectionState cs = connectionState; TransportConnectionState cs = connectionState;
if (cs == null) if (cs == null)
throw new IllegalStateException( throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: " + id.getParentId());
"Cannot lookup a session from a connection that had not been registered: "
+ id.getParentId());
return cs; return cs;
} }
protected TransportConnectionState lookupConnectionState(ConnectionId connectionId) { protected TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
TransportConnectionState cs = connectionState; TransportConnectionState cs = connectionState;
if (cs == null) if (cs == null)
throw new IllegalStateException("Cannot lookup a connection that had not been registered: " throw new IllegalStateException("Cannot lookup a connection that had not been registered: " + connectionId);
+ connectionId);
return cs; return cs;
} }

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -40,8 +41,6 @@ import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.util.concurrent.CopyOnWriteArrayList;
/** /**
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
* @version $Revision: 1.6 $ * @version $Revision: 1.6 $

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -24,9 +26,6 @@ import org.apache.activemq.ThreadPriorities;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.util.Iterator;
import java.util.Set;
/** /**
* Used to provide information on the status of the Connection * Used to provide information on the status of the Connection
* *

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -43,7 +44,6 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Connects a Slave Broker to a Master when using <a * Connects a Slave Broker to a Master when using <a

View File

@ -35,7 +35,6 @@ import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType; import javax.management.openmbean.TabularType;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;

View File

@ -16,14 +16,14 @@
*/ */
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import java.util.List;
import java.util.Map;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException; import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularData;
import java.util.List;
import java.util.Map;
public interface DestinationViewMBean { public interface DestinationViewMBean {

View File

@ -16,16 +16,14 @@
*/ */
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException; import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularData;
/** /**
* @version $Revision: 1.5 $ * @version $Revision: 1.5 $
*/ */
public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean{ public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean {
/** /**
* @return name of the durable subscription name * @return name of the durable subscription name
*/ */
@ -48,7 +46,8 @@ public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean{
public TabularData browseAsTable() throws OpenDataException; public TabularData browseAsTable() throws OpenDataException;
/** /**
* Destroys the durable subscription so that messages will no longer be stored for this subscription * Destroys the durable subscription so that messages will no longer be
* stored for this subscription
*/ */
public void destroy() throws Exception; public void destroy() throws Exception;
} }

View File

@ -23,8 +23,10 @@ import java.util.Hashtable;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.management.InstanceNotFoundException; import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer; import javax.management.MBeanServer;
@ -70,9 +72,6 @@ import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
public class ManagedRegionBroker extends RegionBroker { public class ManagedRegionBroker extends RegionBroker {
private static final Log log = LogFactory.getLog(ManagedRegionBroker.class); private static final Log log = LogFactory.getLog(ManagedRegionBroker.class);
private final MBeanServer mbeanServer; private final MBeanServer mbeanServer;

View File

@ -16,6 +16,12 @@
*/ */
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.util.Hashtable;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.TransportConnection; import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
@ -28,12 +34,6 @@ import org.apache.activemq.util.JMXSupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.IOException;
import java.util.Hashtable;
/** /**
* A managed transport connection * A managed transport connection
* *

View File

@ -16,18 +16,18 @@
*/ */
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.net.URISyntaxException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.IOException;
import java.net.URISyntaxException;
/** /**
* A managed transport connector which can create multiple managed connections * A managed transport connector which can create multiple managed connections
* as clients connect. * as clients connect.

View File

@ -17,11 +17,12 @@
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.rmi.registry.LocateRegistry; import java.rmi.registry.LocateRegistry;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.Attribute; import javax.management.Attribute;
import javax.management.JMException; import javax.management.JMException;
import javax.management.MBeanServer; import javax.management.MBeanServer;
@ -31,11 +32,10 @@ import javax.management.ObjectName;
import javax.management.remote.JMXConnectorServer; import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory; import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL; import javax.management.remote.JMXServiceURL;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.util.ClassLoading;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* A Flow provides different dispatch policies within the NMR * A Flow provides different dispatch policies within the NMR

View File

@ -16,12 +16,12 @@
*/ */
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import javax.jms.InvalidSelectorException;
/** /**
* @version $Revision: 1.5 $ * @version $Revision: 1.5 $
*/ */

View File

@ -16,11 +16,7 @@
*/ */
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
public class TopicView extends DestinationView implements TopicViewMBean { public class TopicView extends DestinationView implements TopicViewMBean {

View File

@ -16,12 +16,13 @@
*/ */
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.management.ObjectName; import javax.management.ObjectName;
import java.io.IOException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -36,8 +37,6 @@ import org.apache.activemq.selector.SelectorParser;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.util.concurrent.CopyOnWriteArrayList;
abstract public class AbstractSubscription implements Subscription { abstract public class AbstractSubscription implements Subscription {
static private final Log log = LogFactory.getLog(AbstractSubscription.class); static private final Log log = LogFactory.getLog(AbstractSubscription.class);

View File

@ -17,7 +17,6 @@
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.StatsImpl; import org.apache.activemq.management.StatsImpl;
@ -64,12 +63,10 @@ public class ConnectionStatistics extends StatsImpl {
if (parent != null) { if (parent != null) {
enqueues.setParent(parent.getEnqueues()); enqueues.setParent(parent.getEnqueues());
dequeues.setParent(parent.getDequeues()); dequeues.setParent(parent.getDequeues());
} } else {
else {
enqueues.setParent(null); enqueues.setParent(null);
dequeues.setParent(null); dequeues.setParent(null);
} }
} }
} }

View File

@ -17,7 +17,6 @@
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import org.apache.activemq.command.Message;
import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.PollCountStatisticImpl; import org.apache.activemq.management.PollCountStatisticImpl;
import org.apache.activemq.management.StatsImpl; import org.apache.activemq.management.StatsImpl;

View File

@ -16,10 +16,10 @@
*/ */
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import org.apache.activemq.broker.ConnectionContext;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
/** /**
* Represents a filter on message references * Represents a filter on message references
* *

View File

@ -17,14 +17,14 @@ package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
@ -36,7 +36,6 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;

View File

@ -16,17 +16,17 @@
*/ */
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import java.util.Iterator;
import java.util.Set;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import javax.jms.InvalidSelectorException;
import java.util.Iterator;
import java.util.Set;
/** /**
* *
* @version $Revision: 1.9 $ * @version $Revision: 1.9 $

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.broker.ConsumerBrokerExchange;
@ -29,9 +32,6 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import java.util.Map;
import java.util.Set;
/** /**
* A Region is used to implement the different QOS options available to * A Region is used to implement the different QOS options available to
* a broker. A Broker is composed of multiple message processing Regions that * a broker. A Broker is composed of multiple message processing Regions that

View File

@ -18,6 +18,9 @@ package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
@ -27,9 +30,6 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
/** /**
* @version $Revision: 1.5 $ * @version $Revision: 1.5 $
*/ */

View File

@ -18,7 +18,7 @@ package org.apache.activemq.broker.region;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination; import org.apache.activemq.command.ActiveMQTempDestination;

View File

@ -15,9 +15,8 @@
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
@ -31,7 +30,7 @@ import org.apache.commons.logging.LogFactory;
*/ */
public class TempTopicRegion extends AbstractRegion { public class TempTopicRegion extends AbstractRegion {
private static final Log log = LogFactory.getLog(TempTopicRegion.class); private static final Log LOG = LogFactory.getLog(TempTopicRegion.class);
public TempTopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, public TempTopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) { DestinationFactory destinationFactory) {
@ -58,7 +57,7 @@ public class TempTopicRegion extends AbstractRegion {
answer.init(); answer.init();
return answer; return answer;
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to create TopicSubscription ", e); LOG.error("Failed to create TopicSubscription ", e);
JMSException jmsEx = new JMSException("Couldn't create TopicSubscription"); JMSException jmsEx = new JMSException("Couldn't create TopicSubscription");
jmsEx.setLinkedException(e); jmsEx.setLinkedException(e);
throw jmsEx; throw jmsEx;

View File

@ -17,8 +17,9 @@ package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;

View File

@ -23,15 +23,16 @@ import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
/** /**
* Interface to pending message (messages awaiting disptach to a consumer) cursor * Interface to pending message (messages awaiting disptach to a consumer)
* cursor
* *
* @version $Revision$ * @version $Revision$
*/ */
public interface PendingMessageCursor extends Service{ public interface PendingMessageCursor extends Service {
/** /**
* Add a destination * Add a destination
*
* @param context * @param context
* @param destination * @param destination
* @throws Exception * @throws Exception
@ -40,11 +41,13 @@ public interface PendingMessageCursor extends Service{
/** /**
* remove a destination * remove a destination
*
* @param context * @param context
* @param destination * @param destination
* @throws Exception * @throws Exception
*/ */
public void remove(ConnectionContext context, Destination destination) throws Exception; public void remove(ConnectionContext context, Destination destination) throws Exception;
/** /**
* @return true if there are no pending messages * @return true if there are no pending messages
*/ */
@ -52,6 +55,7 @@ public interface PendingMessageCursor extends Service{
/** /**
* check if a Destination is Empty for this cursor * check if a Destination is Empty for this cursor
*
* @param destination * @param destination
* @return true id the Destination is empty * @return true id the Destination is empty
*/ */
@ -59,27 +63,27 @@ public interface PendingMessageCursor extends Service{
/** /**
* reset the cursor * reset the cursor
*
*/ */
public void reset(); public void reset();
/** /**
* hint to the cursor to release any locks it might have * hint to the cursor to release any locks it might have grabbed after a
* grabbed after a reset * reset
*
*/ */
public void release(); public void release();
/** /**
* add message to await dispatch * add message to await dispatch
*
* @param node * @param node
* @throws IOException * @throws IOException
* @throws Exception * @throws Exception
*/ */
public void addMessageLast(MessageReference node) throws Exception; public void addMessageLast(MessageReference node) throws Exception;
/** /**
* add message to await dispatch * add message to await dispatch
*
* @param node * @param node
* @throws Exception * @throws Exception
*/ */
@ -87,6 +91,7 @@ public interface PendingMessageCursor extends Service{
/** /**
* Add a message recovered from a retroactive policy * Add a message recovered from a retroactive policy
*
* @param node * @param node
* @throws Exception * @throws Exception
*/ */
@ -104,7 +109,6 @@ public interface PendingMessageCursor extends Service{
/** /**
* remove the message at the cursor position * remove the message at the cursor position
*
*/ */
public void remove(); public void remove();
@ -115,13 +119,13 @@ public interface PendingMessageCursor extends Service{
/** /**
* clear all pending messages * clear all pending messages
*
*/ */
public void clear(); public void clear();
/** /**
* Informs the Broker if the subscription needs to intervention to recover it's state * Informs the Broker if the subscription needs to intervention to recover
* e.g. DurableTopicSubscriber may do * it's state e.g. DurableTopicSubscriber may do
*
* @return true if recovery required * @return true if recovery required
*/ */
public boolean isRecoveryRequired(); public boolean isRecoveryRequired();
@ -133,31 +137,32 @@ public interface PendingMessageCursor extends Service{
/** /**
* Set the max batch size * Set the max batch size
*
* @param maxBatchSize * @param maxBatchSize
*/ */
public void setMaxBatchSize(int maxBatchSize); public void setMaxBatchSize(int maxBatchSize);
/** /**
* Give the cursor a hint that we are about to remove * Give the cursor a hint that we are about to remove messages from memory
* messages from memory only * only
*/ */
public void resetForGC(); public void resetForGC();
/** /**
* remove a node * remove a node
*
* @param node * @param node
*/ */
public void remove(MessageReference node); public void remove(MessageReference node);
/** /**
* free up any internal buffers * free up any internal buffers
*
*/ */
public void gc(); public void gc();
/** /**
* Set the UsageManager * Set the UsageManager
*
* @param usageManager * @param usageManager
* @see org.apache.activemq.memory.UsageManager * @see org.apache.activemq.memory.UsageManager
*/ */
@ -173,13 +178,11 @@ public interface PendingMessageCursor extends Service{
*/ */
public int getMemoryUsageHighWaterMark(); public int getMemoryUsageHighWaterMark();
/** /**
* @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
*/ */
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark); public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark);
/** /**
* @return true if the cursor is full * @return true if the cursor is full
*/ */
@ -192,16 +195,17 @@ public interface PendingMessageCursor extends Service{
/** /**
* destroy the cursor * destroy the cursor
*
* @throws Exception * @throws Exception
*/ */
public void destroy() throws Exception; public void destroy() throws Exception;
/** /**
* Page in a restricted number of messages * Page in a restricted number of messages
*
* @param maxItems * @param maxItems
* @return a list of paged in messages * @return a list of paged in messages
*/ */
public LinkedList pageInList(int maxItems); public LinkedList pageInList(int maxItems);
} }

View File

@ -19,6 +19,7 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;

View File

@ -80,8 +80,7 @@ public class MessageGroupHashBucket implements MessageGroupMap {
final MessageGroupSet answer = createMessageGroupSet(bucketNumber); final MessageGroupSet answer = createMessageGroupSet(bucketNumber);
if (parent == null) { if (parent == null) {
return answer; return answer;
} } else {
else {
// union the two sets together // union the two sets together
return new MessageGroupSet() { return new MessageGroupSet() {
public boolean contains(String groupID) { public boolean contains(String groupID) {
@ -103,7 +102,7 @@ public class MessageGroupHashBucket implements MessageGroupMap {
protected int getBucketNumber(String groupId) { protected int getBucketNumber(String groupId) {
int bucket = groupId.hashCode() % bucketCount; int bucket = groupId.hashCode() % bucketCount;
// bucket could be negative // bucket could be negative
if( bucket < 0 ) if (bucket < 0)
bucket *= -1; bucket *= -1;
return bucket; return bucket;
} }

View File

@ -16,13 +16,12 @@
*/ */
package org.apache.activemq.broker.region.group; package org.apache.activemq.broker.region.group;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import java.util.Iterator;
import java.util.Map;
/** /**
* A simple implementation which tracks every individual GroupID value but * A simple implementation which tracks every individual GroupID value but
* which can become a memory leak if clients die before they complete a message * which can become a memory leak if clients die before they complete a message

View File

@ -32,10 +32,10 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
boolean result = false; boolean result = false;
if (message != null) { if (message != null) {
result = true; result = true;
if (message.isPersistent() == false && processNonPersistent == false) { if (!message.isPersistent() && !processNonPersistent) {
result = false; result = false;
} }
if (message.isExpired() && processExpired == false) { if (message.isExpired() && !processExpired) {
result = false; result = false;
} }
} }

View File

@ -18,7 +18,6 @@ package org.apache.activemq.broker.region.policy;
import java.util.List; import java.util.List;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;

View File

@ -18,15 +18,14 @@ package org.apache.activemq.broker.region.policy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.SubscriptionRecovery;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
/** /**
* This implementation of {@link SubscriptionRecoveryPolicy} will keep a fixed * This implementation of {@link SubscriptionRecoveryPolicy} will keep a fixed

View File

@ -25,7 +25,6 @@ import org.apache.activemq.command.ActiveMQTopic;
* DLQ using the subject naming hierarchy. * DLQ using the subject naming hierarchy.
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
*
* @version $Revision$ * @version $Revision$
*/ */
public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy { public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
@ -38,8 +37,7 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
public ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination) { public ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination) {
if (originalDestination.isQueue()) { if (originalDestination.isQueue()) {
return createDestination(originalDestination, queuePrefix, useQueueForQueueMessages); return createDestination(originalDestination, queuePrefix, useQueueForQueueMessages);
} } else {
else {
return createDestination(originalDestination, topicPrefix, useQueueForTopicMessages); return createDestination(originalDestination, topicPrefix, useQueueForTopicMessages);
} }
} }
@ -99,8 +97,7 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
String name = prefix + originalDestination.getPhysicalName(); String name = prefix + originalDestination.getPhysicalName();
if (useQueue) { if (useQueue) {
return new ActiveMQQueue(name); return new ActiveMQQueue(name);
} } else {
else {
return new ActiveMQTopic(name); return new ActiveMQTopic(name);
} }
} }

View File

@ -16,11 +16,11 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.MessageReference;
import java.io.IOException; import java.io.IOException;
import java.util.LinkedList; import java.util.LinkedList;
import org.apache.activemq.broker.region.MessageReference;
/** /**
* A strategy for evicting messages from slow consumers. * A strategy for evicting messages from slow consumers.
* *

View File

@ -16,11 +16,11 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import javax.jms.MessageListener;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import javax.jms.MessageListener;
/** /**
* Represents some kind of query which will load initial messages from some source for a new topic subscriber. * Represents some kind of query which will load initial messages from some source for a new topic subscriber.
* *

View File

@ -18,7 +18,6 @@ package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.SubscriptionRecovery;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -28,12 +27,10 @@ import org.apache.activemq.command.Message;
* This SubscriptionRecoveryPolicy disable recovery of messages. * This SubscriptionRecoveryPolicy disable recovery of messages.
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
*
* @version $Revision$ * @version $Revision$
*/ */
public class NoSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { public class NoSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
public SubscriptionRecoveryPolicy copy() { public SubscriptionRecoveryPolicy copy() {
// This object is immutable // This object is immutable
return this; return this;
@ -52,7 +49,7 @@ public class NoSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy
public void stop() throws Exception { public void stop() throws Exception {
} }
public Message[] browse(ActiveMQDestination dest) throws Exception{ public Message[] browse(ActiveMQDestination dest) throws Exception {
return new Message[0]; return new Message[0];
} }

View File

@ -16,10 +16,10 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.MessageReference;
import java.util.LinkedList; import java.util.LinkedList;
import org.apache.activemq.broker.region.MessageReference;
/** /**
* An eviction strategy which evicts the oldest message first (which is the * An eviction strategy which evicts the oldest message first (which is the
* default). * default).

View File

@ -16,12 +16,12 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.MessageReference;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import org.apache.activemq.broker.region.MessageReference;
/** /**
* An eviction strategy which evicts the oldest message with the lowest priority first. * An eviction strategy which evicts the oldest message with the lowest priority first.
* *

View File

@ -18,20 +18,20 @@ import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
/** /**
* Abstraction to allow different policies for holding messages awaiting dispatch on a Queue * Abstraction to allow different policies for holding messages awaiting
* dispatch on a Queue
* *
* @version $Revision$ * @version $Revision$
*/ */
public interface PendingQueueMessageStoragePolicy{ public interface PendingQueueMessageStoragePolicy {
/** /**
* Retrieve the configured pending message storage cursor; * Retrieve the configured pending message storage cursor;
*
* @param queue * @param queue
* @param tmpStore * @param tmpStore
* @return the cursor * @return the cursor
*
*/ */
public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore); public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore);
} }

View File

@ -16,11 +16,11 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import java.util.List;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.filter.DestinationMap; import org.apache.activemq.filter.DestinationMap;
import java.util.List;
/** /**
* Represents a destination based configuration of policies so that individual * Represents a destination based configuration of policies so that individual
* destinations or wildcard hierarchies of destinations can be configured using * destinations or wildcard hierarchies of destinations can be configured using

View File

@ -16,22 +16,28 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.activemq.ActiveMQMessageTransformation; import org.apache.activemq.ActiveMQMessageTransformation;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.SubscriptionRecovery;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.*; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IdGenerator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* This implementation of {@link SubscriptionRecoveryPolicy} will perform a user * This implementation of {@link SubscriptionRecoveryPolicy} will perform a user
* specific query mechanism to load any messages they may have missed. * specific query mechanism to load any messages they may have missed.

View File

@ -20,7 +20,6 @@ package org.apache.activemq.broker.region.policy;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.SubscriptionRecovery;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;

View File

@ -16,10 +16,10 @@
*/ */
package org.apache.activemq.broker.region.virtual; package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.region.Destination;
import java.util.Collection; import java.util.Collection;
import org.apache.activemq.broker.region.Destination;
/** /**
* *
* @version $Revision$ * @version $Revision$

View File

@ -53,7 +53,7 @@ public class CompositeDestinationInterceptor extends DestinationFilter {
Object value = iter.next(); Object value = iter.next();
if (value instanceof FilteredDestination) { if (value instanceof FilteredDestination) {
FilteredDestination filteredDestination = (FilteredDestination) value; FilteredDestination filteredDestination = (FilteredDestination)value;
if (messageContext == null) { if (messageContext == null) {
messageContext = new MessageEvaluationContext(); messageContext = new MessageEvaluationContext();
messageContext.setMessageReference(message); messageContext.setMessageReference(message);
@ -62,9 +62,8 @@ public class CompositeDestinationInterceptor extends DestinationFilter {
if (filteredDestination.matches(messageContext)) { if (filteredDestination.matches(messageContext)) {
destination = filteredDestination.getDestination(); destination = filteredDestination.getDestination();
} }
} } else if (value instanceof ActiveMQDestination) {
else if (value instanceof ActiveMQDestination) { destination = (ActiveMQDestination)value;
destination = (ActiveMQDestination) value;
} }
if (destination == null) { if (destination == null) {
continue; continue;

View File

@ -16,14 +16,14 @@
*/ */
package org.apache.activemq.broker.region.virtual; package org.apache.activemq.broker.region.virtual;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.selector.SelectorParser;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
/** /**
* Represents a destination which is filtered using some predicate such as a selector * Represents a destination which is filtered using some predicate such as a selector
* so that messages are only dispatched to the destination if they match the filter. * so that messages are only dispatched to the destination if they match the filter.

View File

@ -30,11 +30,9 @@ import org.apache.activemq.filter.DestinationMap;
/** /**
* Implements <a * Implements <a
* href="http://activemq.apache.org/virtual-destinations.html">Virtual * href="http://activemq.apache.org/virtual-destinations.html">Virtual Topics</a>.
* Topics</a>.
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
*
* @version $Revision$ * @version $Revision$
*/ */
public class VirtualDestinationInterceptor implements DestinationInterceptor { public class VirtualDestinationInterceptor implements DestinationInterceptor {
@ -46,15 +44,14 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor {
Set virtualDestinations = destinationMap.get(destination.getActiveMQDestination()); Set virtualDestinations = destinationMap.get(destination.getActiveMQDestination());
List destinations = new ArrayList(); List destinations = new ArrayList();
for (Iterator iter = virtualDestinations.iterator(); iter.hasNext();) { for (Iterator iter = virtualDestinations.iterator(); iter.hasNext();) {
VirtualDestination virtualDestination = (VirtualDestination) iter.next(); VirtualDestination virtualDestination = (VirtualDestination)iter.next();
Destination newNestination = virtualDestination.intercept(destination); Destination newNestination = virtualDestination.intercept(destination);
destinations.add(newNestination); destinations.add(newNestination);
} }
if (!destinations.isEmpty()) { if (!destinations.isEmpty()) {
if (destinations.size() == 1) { if (destinations.size() == 1) {
return (Destination) destinations.get(0); return (Destination)destinations.get(0);
} } else {
else {
// should rarely be used but here just in case // should rarely be used but here just in case
return createCompositeDestination(destination, destinations); return createCompositeDestination(destination, destinations);
} }
@ -79,7 +76,7 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor {
return new DestinationFilter(destination) { return new DestinationFilter(destination) {
public void send(ProducerBrokerExchange context, Message messageSend) throws Exception { public void send(ProducerBrokerExchange context, Message messageSend) throws Exception {
for (Iterator iter = destinations.iterator(); iter.hasNext();) { for (Iterator iter = destinations.iterator(); iter.hasNext();) {
Destination destination = (Destination) iter.next(); Destination destination = (Destination)iter.next();
destination.send(context, messageSend); destination.send(context, messageSend);
} }
} }

View File

@ -16,6 +16,13 @@
*/ */
package org.apache.activemq.broker.util; package org.apache.activemq.broker.util;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
@ -23,15 +30,8 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
/** /**
* An agent which listens to commands on a JMS destination * An agent which listens to commands on a JMS destination
@ -50,7 +50,6 @@ public class CommandAgent implements Service, InitializingBean, DisposableBean,
private Session session; private Session session;
private MessageConsumer consumer; private MessageConsumer consumer;
public void start() throws Exception { public void start() throws Exception {
session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
listener = new CommandMessageListener(session); listener = new CommandMessageListener(session);
@ -68,8 +67,7 @@ public class CommandAgent implements Service, InitializingBean, DisposableBean,
try { try {
consumer.close(); consumer.close();
consumer = null; consumer = null;
} } catch (JMSException e) {
catch (JMSException e) {
stopper.onException(this, e); stopper.onException(this, e);
} }
} }
@ -77,8 +75,7 @@ public class CommandAgent implements Service, InitializingBean, DisposableBean,
try { try {
session.close(); session.close();
session = null; session = null;
} } catch (JMSException e) {
catch (JMSException e) {
stopper.onException(this, e); stopper.onException(this, e);
} }
} }
@ -86,15 +83,15 @@ public class CommandAgent implements Service, InitializingBean, DisposableBean,
try { try {
connection.close(); connection.close();
connection = null; connection = null;
} } catch (JMSException e) {
catch (JMSException e) {
stopper.onException(this, e); stopper.onException(this, e);
} }
} }
stopper.throwFirstException(); stopper.throwFirstException();
} }
// the following methods ensure that we are created on startup and the lifecycles respected // the following methods ensure that we are created on startup and the
// lifecycles respected
// TODO there must be a simpler way? // TODO there must be a simpler way?
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
start(); start();
@ -116,10 +113,8 @@ public class CommandAgent implements Service, InitializingBean, DisposableBean,
return true; return true;
} }
// Properties // Properties
//------------------------------------------------------------------------- // -------------------------------------------------------------------------
public String getBrokerUrl() { public String getBrokerUrl() {
return brokerUrl; return brokerUrl;
} }

View File

@ -17,7 +17,6 @@
package org.apache.activemq.broker.util; package org.apache.activemq.broker.util;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.JMSException;
/** /**
* Represents a processor of text based commands * Represents a processor of text based commands

View File

@ -17,7 +17,6 @@
package org.apache.activemq.broker.util; package org.apache.activemq.broker.util;
import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;

View File

@ -16,14 +16,14 @@
*/ */
package org.apache.activemq.broker.view; package org.apache.activemq.broker.view;
import java.io.FileWriter;
import java.io.PrintWriter;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.BrokerFilter;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.io.FileWriter;
import java.io.PrintWriter;
/** /**
* Useful base class * Useful base class
* *
@ -46,8 +46,7 @@ public abstract class DotFileInterceptorSupport extends BrokerFilter {
PrintWriter writer = new PrintWriter(new FileWriter(file)); PrintWriter writer = new PrintWriter(new FileWriter(file));
try { try {
generateFile(writer); generateFile(writer);
} } finally {
finally {
writer.close(); writer.close();
} }
} }

View File

@ -16,14 +16,6 @@
*/ */
package org.apache.activemq.camel; package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.CustomDestination;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Endpoint;
import org.apache.camel.component.jms.JmsBinding;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
@ -32,6 +24,14 @@ import javax.jms.QueueSender;
import javax.jms.TopicPublisher; import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.CustomDestination;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Endpoint;
import org.apache.camel.component.jms.JmsBinding;
/** /**
* @version $Revision: $ * @version $Revision: $
*/ */

View File

@ -16,6 +16,12 @@
*/ */
package org.apache.activemq.camel; package org.apache.activemq.camel;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import org.apache.activemq.ActiveMQSession; import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.camel.Consumer; import org.apache.camel.Consumer;
@ -24,12 +30,9 @@ import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer; import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor; import org.apache.camel.Processor;
import javax.jms.*;
import javax.jms.IllegalStateException;
/** /**
* A JMS {@link javax.jms.MessageConsumer} which consumes message exchanges from a * A JMS {@link javax.jms.MessageConsumer} which consumes message exchanges from
* Camel {@link Endpoint} * a Camel {@link Endpoint}
* *
* @version $Revision: $ * @version $Revision: $
*/ */
@ -62,11 +65,9 @@ public class CamelMessageConsumer implements MessageConsumer {
if (pollingConsumer != null) { if (pollingConsumer != null) {
pollingConsumer.stop(); pollingConsumer.stop();
} }
} } catch (JMSException e) {
catch (JMSException e) {
throw e; throw e;
} } catch (Exception e) {
catch (Exception e) {
throw JMSExceptionSupport.create(e); throw JMSExceptionSupport.create(e);
} }
} }
@ -99,7 +100,7 @@ public class CamelMessageConsumer implements MessageConsumer {
} }
// Properties // Properties
//----------------------------------------------------------------------- // -----------------------------------------------------------------------
public CamelDestination getDestination() { public CamelDestination getDestination() {
return destination; return destination;
@ -122,7 +123,7 @@ public class CamelMessageConsumer implements MessageConsumer {
} }
// Implementation methods // Implementation methods
//----------------------------------------------------------------------- // -----------------------------------------------------------------------
protected PollingConsumer getPollingConsumer() throws JMSException { protected PollingConsumer getPollingConsumer() throws JMSException {
try { try {
@ -131,11 +132,9 @@ public class CamelMessageConsumer implements MessageConsumer {
pollingConsumer.start(); pollingConsumer.start();
} }
return pollingConsumer; return pollingConsumer;
} } catch (JMSException e) {
catch (JMSException e) {
throw e; throw e;
} } catch (Exception e) {
catch (Exception e) {
throw JMSExceptionSupport.create(e); throw JMSExceptionSupport.create(e);
} }
} }
@ -144,8 +143,7 @@ public class CamelMessageConsumer implements MessageConsumer {
if (exchange != null) { if (exchange != null) {
Message message = destination.getBinding().makeJmsMessage(exchange, session); Message message = destination.getBinding().makeJmsMessage(exchange, session);
return message; return message;
} } else {
else {
return null; return null;
} }
} }
@ -160,11 +158,9 @@ public class CamelMessageConsumer implements MessageConsumer {
}); });
answer.start(); answer.start();
return answer; return answer;
} } catch (JMSException e) {
catch (JMSException e) {
throw e; throw e;
} } catch (Exception e) {
catch (Exception e) {
throw JMSExceptionSupport.create(e); throw JMSExceptionSupport.create(e);
} }
} }

View File

@ -16,6 +16,11 @@
*/ */
package org.apache.activemq.camel; package org.apache.activemq.camel;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.ActiveMQMessageProducerSupport; import org.apache.activemq.ActiveMQMessageProducerSupport;
import org.apache.activemq.ActiveMQSession; import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.JMSExceptionSupport;
@ -24,11 +29,6 @@ import org.apache.camel.Producer;
import org.apache.camel.component.jms.JmsExchange; import org.apache.camel.component.jms.JmsExchange;
import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ObjectHelper;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
/** /**
* A JMS {@link javax.jms.MessageProducer} which sends message exchanges to a * A JMS {@link javax.jms.MessageProducer} which sends message exchanges to a
* Camel {@link Endpoint} * Camel {@link Endpoint}
@ -47,11 +47,9 @@ public class CamelMessageProducer extends ActiveMQMessageProducerSupport {
this.endpoint = endpoint; this.endpoint = endpoint;
try { try {
this.producer = endpoint.createProducer(); this.producer = endpoint.createProducer();
} } catch (JMSException e) {
catch (JMSException e) {
throw e; throw e;
} } catch (Exception e) {
catch (Exception e) {
throw JMSExceptionSupport.create(e); throw JMSExceptionSupport.create(e);
} }
} }
@ -69,33 +67,28 @@ public class CamelMessageProducer extends ActiveMQMessageProducerSupport {
closed = true; closed = true;
try { try {
producer.stop(); producer.stop();
} } catch (JMSException e) {
catch (JMSException e) {
throw e; throw e;
} } catch (Exception e) {
catch (Exception e) {
throw JMSExceptionSupport.create(e); throw JMSExceptionSupport.create(e);
} }
} }
} }
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
CamelDestination camelDestination = null; CamelDestination camelDestination = null;
if (ObjectHelper.equals(destination, this.destination)) { if (ObjectHelper.equals(destination, this.destination)) {
camelDestination = this.destination; camelDestination = this.destination;
} } else {
else {
// TODO support any CamelDestination? // TODO support any CamelDestination?
throw new IllegalArgumentException("Invalid destination setting: " + destination + " when expected: " + this.destination); throw new IllegalArgumentException("Invalid destination setting: " + destination + " when expected: " + this.destination);
} }
try { try {
JmsExchange exchange = new JmsExchange(endpoint.getContext(), camelDestination.getBinding(), message); JmsExchange exchange = new JmsExchange(endpoint.getContext(), camelDestination.getBinding(), message);
producer.process(exchange); producer.process(exchange);
} } catch (JMSException e) {
catch (JMSException e) {
throw e; throw e;
} } catch (Exception e) {
catch (Exception e) {
throw JMSExceptionSupport.create(e); throw JMSExceptionSupport.create(e);
} }
} }

View File

@ -16,12 +16,12 @@
*/ */
package org.apache.activemq.camel; package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQSession;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.QueueSender;
import javax.jms.QueueReceiver; import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import org.apache.activemq.ActiveMQSession;
/** /**
* A JMS {@link Queue} object which refers to a Camel endpoint * A JMS {@link Queue} object which refers to a Camel endpoint

View File

@ -16,13 +16,13 @@
*/ */
package org.apache.activemq.camel; package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQSession;
import org.apache.camel.Endpoint;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.QueueReceiver; import javax.jms.QueueReceiver;
import org.apache.activemq.ActiveMQSession;
import org.apache.camel.Endpoint;
/** /**
* A JMS {@link javax.jms.QueueReceiver} which consumes message exchanges from a * A JMS {@link javax.jms.QueueReceiver} which consumes message exchanges from a
* Camel {@link org.apache.camel.Endpoint} * Camel {@link org.apache.camel.Endpoint}

View File

@ -16,13 +16,13 @@
*/ */
package org.apache.activemq.camel; package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQSession;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Topic; import javax.jms.Topic;
import javax.jms.TopicPublisher; import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQSession;
/** /**
* A JMS {@link javax.jms.Topic} object which refers to a Camel endpoint * A JMS {@link javax.jms.Topic} object which refers to a Camel endpoint
* *

View File

@ -16,14 +16,14 @@
*/ */
package org.apache.activemq.camel; package org.apache.activemq.camel;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import org.apache.activemq.ActiveMQSession; import org.apache.activemq.ActiveMQSession;
import org.apache.camel.Endpoint; import org.apache.camel.Endpoint;
import javax.jms.JMSException;
import javax.jms.TopicPublisher;
import javax.jms.Topic;
import javax.jms.Message;
/** /**
* A JMS {@link javax.jms.TopicPublisher} which sends message exchanges to a * A JMS {@link javax.jms.TopicPublisher} which sends message exchanges to a
* Camel {@link Endpoint} * Camel {@link Endpoint}

View File

@ -16,13 +16,13 @@
*/ */
package org.apache.activemq.camel; package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQSession;
import org.apache.camel.Endpoint;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Topic; import javax.jms.Topic;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQSession;
import org.apache.camel.Endpoint;
/** /**
* A JMS {@link javax.jms.TopicSubscriber} which consumes message exchanges from a * A JMS {@link javax.jms.TopicSubscriber} which consumes message exchanges from a
* Camel {@link Endpoint} * Camel {@link Endpoint}

View File

@ -16,16 +16,17 @@
*/ */
package org.apache.activemq.command; package org.apache.activemq.command;
import org.apache.activemq.BlobMessage;
import org.apache.activemq.blob.BlobUploader;
import org.apache.activemq.util.JMSExceptionSupport;
import javax.jms.JMSException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import javax.jms.JMSException;
import org.apache.activemq.BlobMessage;
import org.apache.activemq.blob.BlobUploader;
import org.apache.activemq.util.JMSExceptionSupport;
/** /**
* An implementation of {@link BlobMessage} for out of band BLOB transfer * An implementation of {@link BlobMessage} for out of band BLOB transfer
* *
@ -45,7 +46,6 @@ public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage
private transient BlobUploader blobUploader; private transient BlobUploader blobUploader;
private transient URL url; private transient URL url;
public Message copy() { public Message copy() {
ActiveMQBlobMessage copy = new ActiveMQBlobMessage(); ActiveMQBlobMessage copy = new ActiveMQBlobMessage();
copy(copy); copy(copy);
@ -76,7 +76,8 @@ public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage
} }
/** /**
* The MIME type of the BLOB which can be used to apply different content types to messages. * The MIME type of the BLOB which can be used to apply different content
* types to messages.
* *
* @openwire:property version=3 cache=true * @openwire:property version=3 cache=true
*/ */
@ -96,7 +97,8 @@ public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage
} }
/** /**
* The name of the attachment which can be useful information if transmitting files over ActiveMQ * The name of the attachment which can be useful information if
* transmitting files over ActiveMQ
* *
* @openwire:property version=3 cache=false * @openwire:property version=3 cache=false
*/ */
@ -131,8 +133,7 @@ public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage
if (url == null && remoteBlobUrl != null) { if (url == null && remoteBlobUrl != null) {
try { try {
url = new URL(remoteBlobUrl); url = new URL(remoteBlobUrl);
} } catch (MalformedURLException e) {
catch (MalformedURLException e) {
throw JMSExceptionSupport.create(e); throw JMSExceptionSupport.create(e);
} }
} }
@ -144,7 +145,6 @@ public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage
remoteBlobUrl = url != null ? url.toExternalForm() : null; remoteBlobUrl = url != null ? url.toExternalForm() : null;
} }
public BlobUploader getBlobUploader() { public BlobUploader getBlobUploader() {
return blobUploader; return blobUploader;
} }
@ -156,13 +156,13 @@ public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage
public void onSend() throws JMSException { public void onSend() throws JMSException {
super.onSend(); super.onSend();
// lets ensure we upload the BLOB first out of band before we send the message // lets ensure we upload the BLOB first out of band before we send the
// message
if (blobUploader != null) { if (blobUploader != null) {
try { try {
URL value = blobUploader.upload(this); URL value = blobUploader.upload(this);
setURL(value); setURL(value);
} } catch (IOException e) {
catch (IOException e) {
throw JMSExceptionSupport.create(e); throw JMSExceptionSupport.create(e);
} }
} }

View File

@ -16,17 +16,29 @@
*/ */
package org.apache.activemq.command; package org.apache.activemq.command;
import org.apache.activemq.ActiveMQConnection; import java.io.DataInputStream;
import org.apache.activemq.util.ByteArrayInputStream; import java.io.DataOutputStream;
import org.apache.activemq.util.ByteArrayOutputStream; import java.io.EOFException;
import org.apache.activemq.util.*; import java.io.FilterOutputStream;
import java.io.IOException;
import javax.jms.*; import java.io.InputStream;
import java.io.*; import java.io.OutputStream;
import java.util.zip.Deflater; import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream; import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream; import java.util.zip.InflaterInputStream;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotReadableException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.JMSExceptionSupport;
/** /**
* A <CODE>BytesMessage</CODE> object is used to send a message containing a * A <CODE>BytesMessage</CODE> object is used to send a message containing a
* stream of uninterpreted bytes. It inherits from the <CODE>Message</CODE> * stream of uninterpreted bytes. It inherits from the <CODE>Message</CODE>

View File

@ -16,9 +16,15 @@
*/ */
package org.apache.activemq.command; package org.apache.activemq.command;
import org.apache.activemq.jndi.JNDIBaseStorable; import java.io.Externalizable;
import org.apache.activemq.util.IntrospectionSupport; import java.io.IOException;
import org.apache.activemq.util.URISupport; import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -26,16 +32,10 @@ import javax.jms.Queue;
import javax.jms.TemporaryQueue; import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic; import javax.jms.TemporaryTopic;
import javax.jms.Topic; import javax.jms.Topic;
import java.io.Externalizable;
import java.io.IOException; import org.apache.activemq.jndi.JNDIBaseStorable;
import java.io.ObjectInput; import org.apache.activemq.util.IntrospectionSupport;
import java.io.ObjectOutput; import org.apache.activemq.util.URISupport;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
/** /**
* @openwire:marshaller * @openwire:marshaller

View File

@ -142,7 +142,6 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
* Builds the message body from data * Builds the message body from data
* *
* @throws JMSException * @throws JMSException
*
* @throws IOException * @throws IOException
*/ */
private void loadContent() throws JMSException { private void loadContent() throws JMSException {
@ -503,7 +502,7 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
*/ */
public void setBoolean(String name, boolean value) throws JMSException { public void setBoolean(String name, boolean value) throws JMSException {
initializeWriting(); initializeWriting();
put(name, (value) ? Boolean.TRUE : Boolean.FALSE); put(name, value ? Boolean.TRUE : Boolean.FALSE);
} }
/** /**

View File

@ -415,8 +415,11 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
} }
protected void checkValidObject(Object value) throws MessageFormatException { protected void checkValidObject(Object value) throws MessageFormatException {
if (!(value instanceof Boolean || value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long || value instanceof Float
|| value instanceof Double || value instanceof Character || value instanceof String || value == null)) { boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long ;
valid = valid || value instanceof Float || value instanceof Double || value instanceof Character || value instanceof String || value == null;
if (!valid) {
ActiveMQConnection conn = getConnection(); ActiveMQConnection conn = getConnection();
// conn is null if we are in the broker rather than a JMS client // conn is null if we are in the broker rather than a JMS client

View File

@ -17,15 +17,6 @@
package org.apache.activemq.command; package org.apache.activemq.command;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
import org.apache.activemq.util.JMSExceptionSupport;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
@ -36,6 +27,16 @@ import java.io.Serializable;
import java.util.zip.DeflaterOutputStream; import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream; import java.util.zip.InflaterInputStream;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
import org.apache.activemq.util.JMSExceptionSupport;
/** /**
* An <CODE>ObjectMessage</CODE> object is used to send a message that * An <CODE>ObjectMessage</CODE> object is used to send a message that
* contains a serializable object in the Java programming language ("Java * contains a serializable object in the Java programming language ("Java

View File

@ -16,17 +16,6 @@
*/ */
package org.apache.activemq.command; package org.apache.activemq.command;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat;
import javax.jms.JMSException;
import javax.jms.MessageNotWriteableException;
import javax.jms.TextMessage;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
@ -35,6 +24,18 @@ import java.io.OutputStream;
import java.util.zip.DeflaterOutputStream; import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream; import java.util.zip.InflaterInputStream;
import javax.jms.JMSException;
import javax.jms.MessageNotWriteableException;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat;
/** /**
* @openwire:marshaller code="28" * @openwire:marshaller code="28"
* @version $Revision$ * @version $Revision$

View File

@ -19,19 +19,17 @@ package org.apache.activemq.command;
import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.state.CommandVisitor;
/** /**
*
* @openwire:marshaller code="9" * @openwire:marshaller code="9"
* @version $Revision: 1.7 $ * @version $Revision: 1.7 $
*/ */
public class RemoveSubscriptionInfo extends BaseCommand { public class RemoveSubscriptionInfo extends BaseCommand {
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.REMOVE_SUBSCRIPTION_INFO; public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_SUBSCRIPTION_INFO;
protected ConnectionId connectionId; protected ConnectionId connectionId;
protected String clientId; protected String clientId;
protected String subscriptionName; protected String subscriptionName;
public byte getDataStructureType() { public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE; return DATA_STRUCTURE_TYPE;
} }
@ -42,6 +40,7 @@ public class RemoveSubscriptionInfo extends BaseCommand {
public ConnectionId getConnectionId() { public ConnectionId getConnectionId() {
return connectionId; return connectionId;
} }
public void setConnectionId(ConnectionId connectionId) { public void setConnectionId(ConnectionId connectionId) {
this.connectionId = connectionId; this.connectionId = connectionId;
} }
@ -81,7 +80,7 @@ public class RemoveSubscriptionInfo extends BaseCommand {
} }
public Response visit(CommandVisitor visitor) throws Exception { public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processRemoveSubscription( this ); return visitor.processRemoveSubscription(this);
} }
} }

View File

@ -24,7 +24,7 @@ import org.apache.activemq.state.CommandVisitor;
*/ */
public class Response extends BaseCommand { public class Response extends BaseCommand {
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.RESPONSE; public static final byte DATA_STRUCTURE_TYPE = CommandTypes.RESPONSE;
int correlationId; int correlationId;
public byte getDataStructureType() { public byte getDataStructureType() {

View File

@ -32,7 +32,7 @@ public class CompositeDestinationFilter extends DestinationFilter {
filters = new DestinationFilter[destinations.length]; filters = new DestinationFilter[destinations.length];
for (int i = 0; i < destinations.length; i++) { for (int i = 0; i < destinations.length; i++) {
ActiveMQDestination childDestination = destinations[i]; ActiveMQDestination childDestination = destinations[i];
filters[i]= DestinationFilter.parseFilter(childDestination); filters[i] = DestinationFilter.parseFilter(childDestination);
} }
} }

View File

@ -51,8 +51,7 @@ public class DestinationMap {
* or composite destinations this will typically be a List of matching * or composite destinations this will typically be a List of matching
* values. * values.
* *
* @param key * @param key the destination to lookup
* the destination to lookup
* @return a List of matching values or an empty list if there are no * @return a List of matching values or an empty list if there are no
* matching values. * matching values.
*/ */
@ -64,9 +63,8 @@ public class DestinationMap {
ActiveMQDestination childDestination = destinations[i]; ActiveMQDestination childDestination = destinations[i];
Object value = get(childDestination); Object value = get(childDestination);
if (value instanceof Set) { if (value instanceof Set) {
answer.addAll((Set) value); answer.addAll((Set)value);
} } else if (value != null) {
else if (value != null) {
answer.add(value); answer.add(value);
} }
} }
@ -121,7 +119,6 @@ public class DestinationMap {
return topicRootNode; return topicRootNode;
} }
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@ -131,13 +128,12 @@ public class DestinationMap {
*/ */
protected void setEntries(List entries) { protected void setEntries(List entries) {
for (Iterator iter = entries.iterator(); iter.hasNext();) { for (Iterator iter = entries.iterator(); iter.hasNext();) {
Object element = (Object) iter.next(); Object element = (Object)iter.next();
Class type = getEntryClass(); Class type = getEntryClass();
if (type.isInstance(element)) { if (type.isInstance(element)) {
DestinationMapEntry entry = (DestinationMapEntry) element; DestinationMapEntry entry = (DestinationMapEntry)element;
put(entry.getDestination(), entry.getValue()); put(entry.getDestination(), entry.getValue());
} } else {
else {
throw new IllegalArgumentException("Each entry must be an instance of type: " + type.getName() + " but was: " + element); throw new IllegalArgumentException("Each entry must be an instance of type: " + type.getName() + " but was: " + element);
} }
} }
@ -169,7 +165,7 @@ public class DestinationMap {
if (key.isComposite()) { if (key.isComposite()) {
ActiveMQDestination[] destinations = key.getCompositeDestinations(); ActiveMQDestination[] destinations = key.getCompositeDestinations();
for (int i = 0; i < destinations.length; i++) { for (int i = 0; i < destinations.length; i++) {
rc.add( removeAll(destinations[i]) ); rc.add(removeAll(destinations[i]));
} }
return rc; return rc;
} }
@ -183,8 +179,7 @@ public class DestinationMap {
* no matching value. If there are multiple values, the results are sorted * no matching value. If there are multiple values, the results are sorted
* and the last item (the biggest) is returned. * and the last item (the biggest) is returned.
* *
* @param destination * @param destination the destination to find the value for
* the destination to find the value for
* @return the largest matching value or null if no value matches * @return the largest matching value or null if no value matches
*/ */
public Object chooseValue(ActiveMQDestination destination) { public Object chooseValue(ActiveMQDestination destination) {
@ -202,8 +197,7 @@ public class DestinationMap {
protected DestinationMapNode getRootNode(ActiveMQDestination key) { protected DestinationMapNode getRootNode(ActiveMQDestination key) {
if (key.isQueue()) { if (key.isQueue()) {
return queueRootNode; return queueRootNode;
} } else {
else {
return topicRootNode; return topicRootNode;
} }
} }

View File

@ -31,16 +31,13 @@ public abstract class DestinationMapEntry implements InitializingBean, Comparabl
private ActiveMQDestination destination; private ActiveMQDestination destination;
public int compareTo(Object that) { public int compareTo(Object that) {
if (that instanceof DestinationMapEntry) { if (that instanceof DestinationMapEntry) {
DestinationMapEntry thatEntry = (DestinationMapEntry) that; DestinationMapEntry thatEntry = (DestinationMapEntry)that;
return ActiveMQDestination.compare(destination, thatEntry.destination); return ActiveMQDestination.compare(destination, thatEntry.destination);
} } else if (that == null) {
else if (that == null) {
return 1; return 1;
} } else {
else {
return getClass().getName().compareTo(that.getClass().getName()); return getClass().getName().compareTo(that.getClass().getName());
} }
} }

Some files were not shown because too many files have changed in this diff Show More