More checkstyle violation fixes

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@564271 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-08-09 16:37:49 +00:00
parent 74a7a8bbfc
commit fc00993839
744 changed files with 5007 additions and 4370 deletions

View File

@ -99,16 +99,21 @@ import org.apache.commons.logging.LogFactory;
public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener {
private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, true, 1000);
private final ThreadPoolExecutor asyncConnectionThread;
private static final Log LOG = LogFactory.getLog(ActiveMQConnection.class);
private static final IdGenerator connectionIdGenerator = new IdGenerator();
public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
private static final Log LOG = LogFactory.getLog(ActiveMQConnection.class);
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
public final ConcurrentHashMap activeTempDestinations = new ConcurrentHashMap();
protected boolean dispatchAsync;
protected boolean alwaysSessionAsync = true;
private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, true, 1000);
private final ThreadPoolExecutor asyncConnectionThread;
// Connection state variables
private final ConnectionInfo info;
private ExceptionListener exceptionListener;
@ -127,8 +132,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private boolean copyMessageOnSend = true;
private boolean useCompression;
private boolean objectMessageSerializationDefered;
protected boolean dispatchAsync;
protected boolean alwaysSessionAsync = true;
private boolean useAsyncSend;
private boolean optimizeAcknowledge;
private boolean nestedMapAndListEnabled = true;
@ -163,7 +166,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
final ConcurrentHashMap activeTempDestinations = new ConcurrentHashMap();
private AdvisoryConsumer advisoryConsumer;
private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
@ -201,7 +203,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
});
// asyncConnectionThread.allowCoreThreadTimeOut(true);
this.info = new ConnectionInfo(new ConnectionId(connectionIdGenerator.generateId()));
this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId()));
this.info.setManageable(true);
this.connectionSessionId = new SessionId(info.getConnectionId(), -1);

View File

@ -62,7 +62,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
public static final String DEFAULT_PASSWORD = null;
public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
public Thread newThread(Runnable run) {
Thread thread = new Thread(run);
thread.setPriority(ThreadPriorities.INBOUND_CLIENT_CONNECTION);
@ -772,11 +772,11 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
this.factoryStats.setEnabled(statsEnabled);
}
synchronized public int getProducerWindowSize() {
public synchronized int getProducerWindowSize() {
return producerWindowSize;
}
synchronized public void setProducerWindowSize(int producerWindowSize) {
public synchronized void setProducerWindowSize(int producerWindowSize) {
this.producerWindowSize = producerWindowSize;
}

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.util.LinkedHashMap;

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.

View File

@ -114,7 +114,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable {
}
}
synchronized public void flush() throws IOException {
public synchronized void flush() throws IOException {
flushBuffer();
}

View File

@ -27,8 +27,8 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.3 $
*/
public class ActiveMQPrefetchPolicy implements Serializable {
private static final Log log = LogFactory.getLog(ActiveMQPrefetchPolicy.class);
private static final int MAX_PREFETCH_SIZE = (Short.MAX_VALUE - 1);
private static final Log LOG = LogFactory.getLog(ActiveMQPrefetchPolicy.class);
private static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE - 1;
private int queuePrefetch;
private int queueBrowserPrefetch;
private int topicPrefetch;
@ -135,7 +135,7 @@ public class ActiveMQPrefetchPolicy implements Serializable {
private int getMaxPrefetchLimit(int value) {
int result = Math.min(value, MAX_PREFETCH_SIZE);
if (result < value) {
log.warn("maximum prefetch limit has been reset from " + value + " to " + MAX_PREFETCH_SIZE);
LOG.warn("maximum prefetch limit has been reset from " + value + " to " + MAX_PREFETCH_SIZE);
}
return result;
}

View File

@ -194,7 +194,7 @@ public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
}
}
synchronized public void close() throws JMSException {
public synchronized void close() throws JMSException {
destroyConsumer();
closed = true;
}

View File

@ -125,14 +125,15 @@ public class ActiveMQQueueSession implements QueueSession {
/**
* @param destination
* @param messageSelector
* @param NoLocal
* @param noLocal
* @return
* @throws JMSException
*/
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException {
if (destination instanceof Topic)
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
if (destination instanceof Topic) {
throw new InvalidDestinationException("Topics are not supported by a QueueSession");
return next.createConsumer(destination, messageSelector, NoLocal);
}
return next.createConsumer(destination, messageSelector, noLocal);
}
/**

View File

@ -178,22 +178,14 @@ import org.apache.commons.logging.LogFactory;
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
public static interface DeliveryListener {
public void beforeDelivery(ActiveMQSession session, Message msg);
void beforeDelivery(ActiveMQSession session, Message msg);
public void afterDelivery(ActiveMQSession session, Message msg);
void afterDelivery(ActiveMQSession session, Message msg);
}
private static final Log log = LogFactory.getLog(ActiveMQSession.class);
private static final Log LOG = LogFactory.getLog(ActiveMQSession.class);
protected int acknowledgementMode;
private MessageListener messageListener;
private JMSSessionStatsImpl stats;
private TransactionContext transactionContext;
private DeliveryListener deliveryListener;
private MessageTransformer transformer;
private BlobTransferPolicy blobTransferPolicy;
protected final ActiveMQConnection connection;
protected final SessionInfo info;
protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
@ -211,6 +203,13 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
protected final boolean debug;
protected Object sendMutex = new Object();
private MessageListener messageListener;
private JMSSessionStatsImpl stats;
private TransactionContext transactionContext;
private DeliveryListener deliveryListener;
private MessageTransformer transformer;
private BlobTransferPolicy blobTransferPolicy;
/**
* Construct the Session
*
@ -223,7 +222,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws JMSException on internal error
*/
protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
this.debug = log.isDebugEnabled();
this.debug = LOG.isDebugEnabled();
this.connection = connection;
this.acknowledgementMode = acknowledgeMode;
this.asyncDispatch = asyncDispatch;
@ -602,7 +601,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
}
synchronized public void dispose() throws JMSException {
public synchronized void dispose() throws JMSException {
if (!closed) {
try {
@ -765,7 +764,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
messageListener.onMessage(message);
} catch (Throwable e) {
// TODO: figure out proper way to handle error.
log.error("error dispatching message: ", e);
LOG.error("error dispatching message: ", e);
connection.onAsyncException(e);
}
@ -1577,7 +1576,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
msg.onSend();
msg.setProducerId(msg.getMessageId().getProducerId());
if (this.debug) {
log.debug("Sending message: " + msg);
LOG.debug("Sending message: " + msg);
}
if (!connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
this.connection.asyncSendPacket(msg);
@ -1823,9 +1822,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
try {
c.close();
} catch (JMSException e) {
log.warn("Exception closing consumer", e);
LOG.warn("Exception closing consumer", e);
}
log.warn("Closed consumer on Command");
LOG.warn("Closed consumer on Command");
break;
}
}

View File

@ -38,7 +38,7 @@ import org.apache.commons.logging.LogFactory;
* @see javax.jms.Session
*/
public class ActiveMQSessionExecutor implements Task {
private static final transient Log log = LogFactory.getLog(ActiveMQSessionExecutor.class);
private static final Log LOG = LogFactory.getLog(ActiveMQSessionExecutor.class);
private ActiveMQSession session;
private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
@ -69,7 +69,7 @@ public class ActiveMQSessionExecutor implements Task {
// lets only warn when a significant amount of time has passed
// just in case its normal operation
if (elapsedTime > aboutUnstartedConnectionTimeout) {
log.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection
LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection
+ " Received: " + message);
startedOrWarnedThatNotStarted = true;
}

View File

@ -126,15 +126,15 @@ public class ActiveMQTopicSession implements TopicSession {
/**
* @param destination
* @param messageSelector
* @param NoLocal
* @param noLocal
* @return
* @throws JMSException
*/
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException {
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
if (destination instanceof Queue) {
throw new InvalidDestinationException("Queues are not supported by a TopicSession");
}
return next.createConsumer(destination, messageSelector, NoLocal);
return next.createConsumer(destination, messageSelector, noLocal);
}
/**

View File

@ -30,12 +30,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class AdvisoryConsumer implements ActiveMQDispatcher {
private static final transient Log log = LogFactory.getLog(AdvisoryConsumer.class);
private static final transient Log LOG = LogFactory.getLog(AdvisoryConsumer.class);
int deliveredCounter;
private final ActiveMQConnection connection;
private ConsumerInfo info;
private boolean closed;
int deliveredCounter;
public AdvisoryConsumer(ActiveMQConnection connection, ConsumerId consumerId) throws JMSException {
this.connection = connection;
@ -53,7 +54,7 @@ public class AdvisoryConsumer implements ActiveMQDispatcher {
try {
this.connection.asyncSendPacket(info.createRemoveCommand());
} catch (JMSException e) {
log.info("Failed to send remove command: " + e, e);
LOG.info("Failed to send remove command: " + e, e);
}
this.connection.removeDispatcher(info.getConsumerId());
closed = true;

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.

View File

@ -36,5 +36,5 @@ public interface Closeable {
* @throws JMSException if the JMS provider fails to close the object due to
* some internal error.
*/
public void close() throws JMSException;
void close() throws JMSException;
}

View File

@ -40,7 +40,7 @@ public class ConnectionFailedException extends JMSException {
super("The JMS connection has failed due to a Transport problem");
}
static private String extractMessage(IOException cause) {
private static String extractMessage(IOException cause) {
String m = cause.getMessage();
if (m == null || m.length() == 0)
m = cause.toString();

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.

View File

@ -21,9 +21,9 @@ package org.apache.activemq;
* @version $Revision: 1.2 $
*/
public interface LocalTransactionEventListener {
public void beginEvent();
void beginEvent();
public void commitEvent();
void commitEvent();
public void rollbackEvent();
void rollbackEvent();
}

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -28,6 +28,6 @@ public interface Message extends javax.jms.Message {
* the MIME types of the different JMS messages, or in the case of {@link org.apache.activemq.BlobMessage}
* it allows you to create a selector on the MIME type of the BLOB body
*/
public String getJMSXMimeType();
String getJMSXMimeType();
}

View File

@ -31,11 +31,11 @@ public interface MessageAvailableConsumer extends MessageConsumer {
* Sets the listener used to notify synchronous consumers that there is a message
* available so that the {@link MessageConsumer#receiveNoWait()} can be called.
*/
public void setAvailableListener(MessageAvailableListener availableListener);
void setAvailableListener(MessageAvailableListener availableListener);
/**
* Gets the listener used to notify synchronous consumers that there is a message
* available so that the {@link MessageConsumer#receiveNoWait()} can be called.
*/
public MessageAvailableListener getAvailableListener();
MessageAvailableListener getAvailableListener();
}

View File

@ -33,10 +33,10 @@ public interface MessageTransformer {
/**
* Transforms the given message inside the producer before it is sent to the JMS bus.
*/
public Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException;
Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException;
/**
* Transforms the given message inside the consumer before being dispatched to the client code
*/
public Message consumerTransform(Session session, MessageConsumer consumer, Message message)throws JMSException;
Message consumerTransform(Session session, MessageConsumer consumer, Message message)throws JMSException;
}

View File

@ -29,8 +29,8 @@ package org.apache.activemq;
*/
public interface Service {
public void start() throws Exception;
void start() throws Exception;
public void stop() throws Exception;
void stop() throws Exception;
}

View File

@ -36,21 +36,21 @@ import javax.jms.Topic;
*/
public interface StreamConnection extends Connection {
public InputStream createInputStream(Destination dest) throws JMSException;
InputStream createInputStream(Destination dest) throws JMSException;
public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException;
InputStream createInputStream(Destination dest, String messageSelector) throws JMSException;
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException;
InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException;
public InputStream createDurableInputStream(Topic dest, String name) throws JMSException;
InputStream createDurableInputStream(Topic dest, String name) throws JMSException;
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException;
InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException;
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException;
InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException;
public OutputStream createOutputStream(Destination dest) throws JMSException;
OutputStream createOutputStream(Destination dest) throws JMSException;
public OutputStream createOutputStream(Destination dest, Map streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException;
OutputStream createOutputStream(Destination dest, Map streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException;
/**
* Unsubscribes a durable subscription that has been created by a client.
@ -71,5 +71,5 @@ public interface StreamConnection extends Connection {
* specified.
* @since 1.1
*/
public void unsubscribe(String name) throws JMSException;
void unsubscribe(String name) throws JMSException;
}

View File

@ -24,9 +24,9 @@ package org.apache.activemq;
*/
public interface ThreadPriorities {
public static final int INBOUND_BROKER_CONNECTION = 6;
public static final int OUT_BOUND_BROKER_DISPATCH = 6;
public static final int INBOUND_CLIENT_CONNECTION = 7;
public static final int INBOUND_CLIENT_SESSION = 7;
public static final int BROKER_MANAGEMENT = 9;
int INBOUND_BROKER_CONNECTION = 6;
int OUT_BOUND_BROKER_DISPATCH = 6;
int INBOUND_CLIENT_CONNECTION = 7;
int INBOUND_CLIENT_SESSION = 7;
int BROKER_MANAGEMENT = 9;
}

View File

@ -62,10 +62,10 @@ import org.apache.commons.logging.LogFactory;
*/
public class TransactionContext implements XAResource {
static final private Log log = LogFactory.getLog(TransactionContext.class);
private static final Log LOG = LogFactory.getLog(TransactionContext.class);
// XATransactionId -> ArrayList of TransactionContext objects
private static final ConcurrentHashMap endedXATransactionContexts = new ConcurrentHashMap();
private static final ConcurrentHashMap ENDED_XA_TRANSACTION_CONTEXTS = new ConcurrentHashMap();
private final ActiveMQConnection connection;
private final LongSequenceGenerator localTransactionIdGenerator;
@ -267,12 +267,12 @@ public class TransactionContext implements XAResource {
*/
public void start(Xid xid, int flags) throws XAException {
if (log.isDebugEnabled())
log.debug("Start: " + xid);
if (isInLocalTransaction())
if (LOG.isDebugEnabled()) {
LOG.debug("Start: " + xid);
}
if (isInLocalTransaction()) {
throw new XAException(XAException.XAER_PROTO);
}
// Are we already associated?
if (associatedXid != null) {
throw new XAException(XAException.XAER_PROTO);
@ -299,8 +299,8 @@ public class TransactionContext implements XAResource {
public void end(Xid xid, int flags) throws XAException {
if (log.isDebugEnabled())
log.debug("End: " + xid);
if (LOG.isDebugEnabled())
LOG.debug("End: " + xid);
if (isInLocalTransaction())
throw new XAException(XAException.XAER_PROTO);
@ -344,8 +344,8 @@ public class TransactionContext implements XAResource {
}
public int prepare(Xid xid) throws XAException {
if (log.isDebugEnabled())
log.debug("Prepare: " + xid);
if (LOG.isDebugEnabled())
LOG.debug("Prepare: " + xid);
// We allow interleaving multiple transactions, so
// we don't limit prepare to the associated xid.
@ -373,8 +373,8 @@ public class TransactionContext implements XAResource {
public void rollback(Xid xid) throws XAException {
if (log.isDebugEnabled())
log.debug("Rollback: " + xid);
if (LOG.isDebugEnabled())
LOG.debug("Rollback: " + xid);
// We allow interleaving multiple transactions, so
// we don't limit rollback to the associated xid.
@ -398,7 +398,7 @@ public class TransactionContext implements XAResource {
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
this.connection.syncSendPacket(info);
ArrayList l = (ArrayList)endedXATransactionContexts.remove(x);
ArrayList l = (ArrayList)ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
for (Iterator iter = l.iterator(); iter.hasNext();) {
TransactionContext ctx = (TransactionContext)iter.next();
@ -414,8 +414,8 @@ public class TransactionContext implements XAResource {
// XAResource interface
public void commit(Xid xid, boolean onePhase) throws XAException {
if (log.isDebugEnabled())
log.debug("Commit: " + xid);
if (LOG.isDebugEnabled())
LOG.debug("Commit: " + xid);
// We allow interleaving multiple transactions, so
// we don't limit commit to the associated xid.
@ -437,7 +437,7 @@ public class TransactionContext implements XAResource {
this.connection.syncSendPacket(info);
ArrayList l = (ArrayList)endedXATransactionContexts.remove(x);
ArrayList l = (ArrayList)ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
for (Iterator iter = l.iterator(); iter.hasNext();) {
TransactionContext ctx = (TransactionContext)iter.next();
@ -452,8 +452,8 @@ public class TransactionContext implements XAResource {
}
public void forget(Xid xid) throws XAException {
if (log.isDebugEnabled())
log.debug("Forget: " + xid);
if (LOG.isDebugEnabled())
LOG.debug("Forget: " + xid);
// We allow interleaving multiple transactions, so
// we don't limit forget to the associated xid.
@ -494,8 +494,8 @@ public class TransactionContext implements XAResource {
}
public Xid[] recover(int flag) throws XAException {
if (log.isDebugEnabled())
log.debug("Recover: " + flag);
if (LOG.isDebugEnabled())
LOG.debug("Recover: " + flag);
TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
try {
@ -551,8 +551,8 @@ public class TransactionContext implements XAResource {
TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN);
try {
this.connection.asyncSendPacket(info);
if (log.isDebugEnabled())
log.debug("Started XA transaction: " + transactionId);
if (LOG.isDebugEnabled())
LOG.debug("Started XA transaction: " + transactionId);
} catch (JMSException e) {
throw toXAException(e);
}
@ -563,18 +563,18 @@ public class TransactionContext implements XAResource {
TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END);
try {
this.connection.syncSendPacket(info);
if (log.isDebugEnabled())
log.debug("Ended XA transaction: " + transactionId);
if (LOG.isDebugEnabled())
LOG.debug("Ended XA transaction: " + transactionId);
} catch (JMSException e) {
throw toXAException(e);
}
// Add our self to the list of contexts that are interested in
// post commit/rollback events.
ArrayList l = (ArrayList)endedXATransactionContexts.get(transactionId);
ArrayList l = (ArrayList)ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
if (l == null) {
l = new ArrayList(3);
endedXATransactionContexts.put(transactionId, l);
ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
l.add(this);
} else if (!l.contains(this)) {
l.add(this);

View File

@ -50,20 +50,20 @@ import org.apache.commons.logging.LogFactory;
*/
public class AdvisoryBroker extends BrokerFilter {
private static final Log log = LogFactory.getLog(AdvisoryBroker.class);
private static final Log LOG = LogFactory.getLog(AdvisoryBroker.class);
private static final IdGenerator ID_GENERATOR = new IdGenerator();
protected final ConcurrentHashMap connections = new ConcurrentHashMap();
protected final ConcurrentHashMap consumers = new ConcurrentHashMap();
protected final ConcurrentHashMap producers = new ConcurrentHashMap();
protected final ConcurrentHashMap destinations = new ConcurrentHashMap();
static final private IdGenerator idGenerator = new IdGenerator();
protected final ProducerId advisoryProducerId = new ProducerId();
final private LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
public AdvisoryBroker(Broker next) {
super(next);
advisoryProducerId.setConnectionId(idGenerator.generateId());
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
}
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
@ -238,7 +238,7 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
fireAdvisory(context, topic, messageReference.getMessage());
} catch (Exception e) {
log.warn("Failed to fire message expired advisory");
LOG.warn("Failed to fire message expired advisory");
}
}

View File

@ -44,7 +44,7 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision$
*/
public class ConsumerEventSource implements Service, MessageListener {
private static final Log log = LogFactory.getLog(ConsumerEventSource.class);
private static final Log LOG = LogFactory.getLog(ConsumerEventSource.class);
private final Connection connection;
private final ActiveMQDestination destination;
@ -97,10 +97,10 @@ public class ConsumerEventSource implements Service, MessageListener {
fireConsumerEvent(new ConsumerStoppedEvent(this, destination, (ConsumerId)removeInfo.getObjectId(), count));
}
} else {
log.warn("Unknown command: " + command);
LOG.warn("Unknown command: " + command);
}
} else {
log.warn("Unknown message type: " + message + ". Message ignored");
LOG.warn("Unknown message type: " + message + ". Message ignored");
}
}
@ -116,9 +116,9 @@ public class ConsumerEventSource implements Service, MessageListener {
Number n = (Number)value;
return n.intValue();
}
log.warn("No consumerCount header available on the message: " + message);
LOG.warn("No consumerCount header available on the message: " + message);
} catch (Exception e) {
log.warn("Failed to extract consumerCount from message: " + message + ".Reason: " + e, e);
LOG.warn("Failed to extract consumerCount from message: " + message + ".Reason: " + e, e);
}
return count;
}

View File

@ -23,5 +23,5 @@ package org.apache.activemq.advisory;
*/
public interface ConsumerListener {
public void onConsumerEvent(ConsumerEvent event);
void onConsumerEvent(ConsumerEvent event);
}

View File

@ -28,7 +28,7 @@ public class ConsumerStartedEvent extends ConsumerEvent {
private static final long serialVersionUID = 5088138839609391074L;
private transient final ConsumerInfo consumerInfo;
private final transient ConsumerInfo consumerInfo;
public ConsumerStartedEvent(ConsumerEventSource source, ActiveMQDestination destination, ConsumerInfo consumerInfo, int count) {
super(source, destination, consumerInfo.getConsumerId(), count);

View File

@ -44,7 +44,7 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 359679 $
*/
public class ProducerEventSource implements Service, MessageListener {
private static final Log log = LogFactory.getLog(ProducerEventSource.class);
private static final Log LOG = LogFactory.getLog(ProducerEventSource.class);
private final Connection connection;
private final ActiveMQDestination destination;
@ -97,10 +97,10 @@ public class ProducerEventSource implements Service, MessageListener {
fireProducerEvent(new ProducerStoppedEvent(this, destination, (ProducerId)removeInfo.getObjectId(), count));
}
} else {
log.warn("Unknown command: " + command);
LOG.warn("Unknown command: " + command);
}
} else {
log.warn("Unknown message type: " + message + ". Message ignored");
LOG.warn("Unknown message type: " + message + ". Message ignored");
}
}
@ -111,9 +111,9 @@ public class ProducerEventSource implements Service, MessageListener {
Number n = (Number)value;
return n.intValue();
}
log.warn("No producerCount header available on the message: " + message);
LOG.warn("No producerCount header available on the message: " + message);
} catch (Exception e) {
log.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e);
LOG.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e);
}
return count;
}

View File

@ -23,5 +23,5 @@ package org.apache.activemq.advisory;
*/
public interface ProducerListener {
public void onProducerEvent(ProducerEvent event);
void onProducerEvent(ProducerEvent event);
}

View File

@ -28,7 +28,7 @@ public class ProducerStartedEvent extends ProducerEvent {
private static final long serialVersionUID = 5088138839609391074L;
private transient final ProducerInfo consumerInfo;
private final transient ProducerInfo consumerInfo;
public ProducerStartedEvent(ProducerEventSource source, ActiveMQDestination destination, ProducerInfo consumerInfo, int count) {
super(source, destination, consumerInfo.getProducerId(), count);

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.

View File

@ -46,22 +46,22 @@ public interface Broker extends Region, Service {
* @param type
* @return
*/
public Broker getAdaptor(Class type);
Broker getAdaptor(Class type);
/**
* Get the id of the broker
*/
public BrokerId getBrokerId();
BrokerId getBrokerId();
/**
* Get the name of the broker
*/
public String getBrokerName();
String getBrokerName();
/**
* A remote Broker connects
*/
public void addBroker(Connection connection, BrokerInfo info);
void addBroker(Connection connection, BrokerInfo info);
/**
* Remove a BrokerInfo
@ -69,14 +69,14 @@ public interface Broker extends Region, Service {
* @param connection
* @param info
*/
public void removeBroker(Connection connection, BrokerInfo info);
void removeBroker(Connection connection, BrokerInfo info);
/**
* A client is establishing a connection with the broker.
*
* @throws Exception TODO
*/
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception;
void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception;
/**
* A client is disconnecting from the broker.
@ -87,7 +87,7 @@ public interface Broker extends Region, Service {
* that caused the client to disconnect.
* @throws Exception TODO
*/
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception;
void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception;
/**
* Adds a session.
@ -96,7 +96,7 @@ public interface Broker extends Region, Service {
* @param info
* @throws Exception TODO
*/
public void addSession(ConnectionContext context, SessionInfo info) throws Exception;
void addSession(ConnectionContext context, SessionInfo info) throws Exception;
/**
* Removes a session.
@ -105,7 +105,7 @@ public interface Broker extends Region, Service {
* @param info
* @throws Exception TODO
*/
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception;
void removeSession(ConnectionContext context, SessionInfo info) throws Exception;
/**
* Adds a producer.
@ -113,7 +113,7 @@ public interface Broker extends Region, Service {
* @param context the enviorment the operation is being executed under.
* @throws Exception TODO
*/
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception;
void addProducer(ConnectionContext context, ProducerInfo info) throws Exception;
/**
* Removes a producer.
@ -121,19 +121,19 @@ public interface Broker extends Region, Service {
* @param context the enviorment the operation is being executed under.
* @throws Exception TODO
*/
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
/**
* @return all clients added to the Broker.
* @throws Exception TODO
*/
public Connection[] getClients() throws Exception;
Connection[] getClients() throws Exception;
/**
* @return all destinations added to the Broker.
* @throws Exception TODO
*/
public ActiveMQDestination[] getDestinations() throws Exception;
ActiveMQDestination[] getDestinations() throws Exception;
/**
* Gets a list of all the prepared xa transactions.
@ -142,7 +142,7 @@ public interface Broker extends Region, Service {
* @return
* @throws Exception TODO
*/
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception;
TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception;
/**
* Starts a transaction.
@ -151,7 +151,7 @@ public interface Broker extends Region, Service {
* @param xid
* @throws Exception TODO
*/
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception;
void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception;
/**
* Prepares a transaction. Only valid for xa transactions.
@ -161,7 +161,7 @@ public interface Broker extends Region, Service {
* @return id
* @throws Exception TODO
*/
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception;
int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception;
/**
* Rollsback a transaction.
@ -171,7 +171,7 @@ public interface Broker extends Region, Service {
* @throws Exception TODO
*/
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception;
void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception;
/**
* Commits a transaction.
@ -181,7 +181,7 @@ public interface Broker extends Region, Service {
* @param onePhase
* @throws Exception TODO
*/
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception;
void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception;
/**
* Forgets a transaction.
@ -190,7 +190,7 @@ public interface Broker extends Region, Service {
* @param transactionId
* @throws Exception
*/
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception;
void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception;
/**
* Get the BrokerInfo's of any connected Brokers
@ -204,24 +204,24 @@ public interface Broker extends Region, Service {
*
* @param messageDispatch
*/
public void preProcessDispatch(MessageDispatch messageDispatch);
void preProcessDispatch(MessageDispatch messageDispatch);
/**
* Notify the Broker that a dispatch has happened
*
* @param messageDispatch
*/
public void postProcessDispatch(MessageDispatch messageDispatch);
void postProcessDispatch(MessageDispatch messageDispatch);
/**
* @return true if the broker has stopped
*/
public boolean isStopped();
boolean isStopped();
/**
* @return a Set of all durable destinations
*/
public Set getDurableDestinations();
Set getDurableDestinations();
/**
* Add and process a DestinationInfo object
@ -230,7 +230,7 @@ public interface Broker extends Region, Service {
* @param info
* @throws Exception
*/
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
/**
* Remove and process a DestinationInfo object
@ -239,18 +239,18 @@ public interface Broker extends Region, Service {
* @param info
* @throws Exception
*/
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
/**
* @return true if fault tolerant
*/
public boolean isFaultTolerantConfiguration();
boolean isFaultTolerantConfiguration();
/**
* @return the connection context used to make administration operations on
* startup or via JMX MBeans
*/
public abstract ConnectionContext getAdminConnectionContext();
ConnectionContext getAdminConnectionContext();
/**
* Sets the default administration connection context used when configuring
@ -258,22 +258,22 @@ public interface Broker extends Region, Service {
*
* @param adminConnectionContext
*/
public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext);
void setAdminConnectionContext(ConnectionContext adminConnectionContext);
/**
* @return the temp data store
*/
public Store getTempDataStore();
Store getTempDataStore();
/**
* @return the URI that can be used to connect to the local Broker
*/
public URI getVmConnectorURI();
URI getVmConnectorURI();
/**
* called when the brokerService starts
*/
public void brokerServiceStarted();
void brokerServiceStarted();
/**
* @return the BrokerService
@ -295,7 +295,7 @@ public interface Broker extends Region, Service {
* @param messageReference
* @return true if the message is expired
*/
public boolean isExpired(MessageReference messageReference);
boolean isExpired(MessageReference messageReference);
/**
* A Message has Expired
@ -303,7 +303,7 @@ public interface Broker extends Region, Service {
* @param context
* @param messageReference
*/
public void messageExpired(ConnectionContext context, MessageReference messageReference);
void messageExpired(ConnectionContext context, MessageReference messageReference);
/**
* A message needs to go the a DLQ
@ -311,6 +311,6 @@ public interface Broker extends Region, Service {
* @param context
* @param messageReference
*/
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference);
void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference);
}

View File

@ -31,12 +31,11 @@ import org.apache.activemq.util.IOExceptionSupport;
*/
public class BrokerFactory {
static final private FactoryFinder brokerFactoryHandlerFinder = new FactoryFinder(
"META-INF/services/org/apache/activemq/broker/");
private static final FactoryFinder BROKER_FACTORY_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/");
public static BrokerFactoryHandler createBrokerFactoryHandler(String type) throws IOException {
try {
return (BrokerFactoryHandler)brokerFactoryHandlerFinder.newInstance(type);
return (BrokerFactoryHandler)BROKER_FACTORY_HANDLER_FINDER.newInstance(type);
} catch (Throwable e) {
throw IOExceptionSupport.create("Could load " + type + " factory:" + e, e);
}

View File

@ -24,5 +24,5 @@ import java.net.URI;
* @version $Revision$
*/
public interface BrokerFactoryHandler {
public BrokerService createBroker(URI brokerURI) throws Exception;
BrokerService createBroker(URI brokerURI) throws Exception;
}

View File

@ -48,7 +48,7 @@ import org.apache.activemq.kaha.Store;
*/
public class BrokerFilter implements Broker {
final protected Broker next;
protected final Broker next;
public BrokerFilter(Broker next) {
this.next = next;

View File

@ -27,6 +27,6 @@ public interface BrokerPlugin {
* Installs the plugin into the interceptor chain of the broker, returning the new
* intercepted broker to use.
*/
public Broker installPlugin(Broker broker) throws Exception;
Broker installPlugin(Broker broker) throws Exception;
}

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import java.util.HashMap;
@ -24,16 +26,16 @@ import org.apache.commons.logging.LogFactory;
*/
public class BrokerRegistry {
private static final Log log = LogFactory.getLog(BrokerRegistry.class);
static final private BrokerRegistry instance = new BrokerRegistry();
public static BrokerRegistry getInstance() {
return instance;
}
private static final Log LOG = LogFactory.getLog(BrokerRegistry.class);
private static final BrokerRegistry INSTANCE = new BrokerRegistry();
private final Object mutex = new Object();
private final HashMap<String, BrokerService> brokers = new HashMap<String, BrokerService>();
public static BrokerRegistry getInstance() {
return INSTANCE;
}
/**
* @param brokerName
* @return the BrokerService
@ -45,7 +47,7 @@ public class BrokerRegistry {
if (result == null && brokerName != null && brokerName.equals(BrokerService.DEFAULT_BROKER_NAME)) {
result = findFirst();
if (result != null) {
log.warn("Broker localhost not started so using " + result.getBrokerName() + " instead");
LOG.warn("Broker localhost not started so using " + result.getBrokerName() + " instead");
}
}
}

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -24,5 +24,5 @@ package org.apache.activemq.broker;
*/
public interface BrokerServiceAware {
public void setBrokerService(BrokerService brokerService);
void setBrokerService(BrokerService brokerService);
}

View File

@ -31,78 +31,78 @@ public interface Connection extends Service {
/**
* @return the connector that created this connection.
*/
public Connector getConnector();
Connector getConnector();
/**
* Sends a message to the client.
*
* @param message the message to send to the client.
*/
public void dispatchSync(Command message);
void dispatchSync(Command message);
/**
* Sends a message to the client.
*
* @param command
*/
public void dispatchAsync(Command command);
void dispatchAsync(Command command);
/**
* Services a client command and submits it to the broker.
*
* @param command
*/
public Response service(Command command);
Response service(Command command);
/**
* Handles an unexpected error associated with a connection.
*
* @param error
*/
public void serviceException(Throwable error);
void serviceException(Throwable error);
/**
* @return true if the Connection is slow
*/
public boolean isSlow();
boolean isSlow();
/**
* @return if after being marked, the Connection is still writing
*/
public boolean isBlocked();
boolean isBlocked();
/**
* @return true if the Connection is connected
*/
public boolean isConnected();
boolean isConnected();
/**
* @return true if the Connection is active
*/
public boolean isActive();
boolean isActive();
/**
* Returns the number of messages to be dispatched to this connection
*/
public int getDispatchQueueSize();
int getDispatchQueueSize();
/**
* Returns the statistics for this connection
*/
public ConnectionStatistics getStatistics();
ConnectionStatistics getStatistics();
/**
* @return true if the Connection will process control commands
*/
public boolean isManageable();
boolean isManageable();
/**
* @return the source address for this connection
*/
public String getRemoteAddress();
String getRemoteAddress();
public void serviceExceptionAsync(IOException e);
void serviceExceptionAsync(IOException e);
public String getConnectionId();
String getConnectionId();
}

View File

@ -31,10 +31,10 @@ public interface Connector extends Service {
*
* @return
*/
public BrokerInfo getBrokerInfo();
BrokerInfo getBrokerInfo();
/**
* @return the statistics for this connector
*/
public ConnectorStatistics getStatistics();
ConnectorStatistics getStatistics();
}

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;

View File

@ -51,7 +51,7 @@ import org.apache.commons.logging.LogFactory;
*/
public class TransactionBroker extends BrokerFilter {
private static final Log log = LogFactory.getLog(TransactionBroker.class);
private static final Log LOG = LogFactory.getLog(TransactionBroker.class);
// The prepared XA transactions.
private TransactionStore transactionStore;
@ -222,8 +222,8 @@ public class TransactionBroker extends BrokerFilter {
if (sync != null && transaction != null) {
transaction.removeSynchronization(sync);
}
if (log.isDebugEnabled()) {
log.debug("IGNORING duplicate message " + message);
if (LOG.isDebugEnabled()) {
LOG.debug("IGNORING duplicate message " + message);
}
}
}
@ -234,7 +234,7 @@ public class TransactionBroker extends BrokerFilter {
Transaction transaction = (Transaction)iter.next();
transaction.rollback();
} catch (Exception e) {
log.warn("ERROR Rolling back disconnected client's transactions: ", e);
LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
}
iter.remove();
}

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import java.io.IOException;
@ -143,7 +145,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
private boolean networkConnection;
private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
private DemandForwardingBridge duplexBridge;
final private TaskRunnerFactory taskRunnerFactory;
private final TaskRunnerFactory taskRunnerFactory;
private TransportConnectionState connectionState;
static class TransportConnectionState extends org.apache.activemq.state.ConnectionState {
@ -366,7 +368,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception {
public synchronized Response processBeginTransaction(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = null;
if (cs != null) {
@ -383,14 +385,14 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
synchronized public Response processEndTransaction(TransactionInfo info) throws Exception {
public synchronized Response processEndTransaction(TransactionInfo info) throws Exception {
// No need to do anything. This packet is just sent by the client
// make sure he is synced with the server as commit command could
// come from a different connection.
return null;
}
synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception {
public synchronized Response processPrepareTransaction(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = null;
if (cs != null) {
@ -415,7 +417,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
}
}
synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
public synchronized Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
cs.removeTransactionState(info.getTransactionId());
@ -423,7 +425,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
public synchronized Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
cs.removeTransactionState(info.getTransactionId());
@ -431,7 +433,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception {
public synchronized Response processRollbackTransaction(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
cs.removeTransactionState(info.getTransactionId());
@ -439,14 +441,14 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception {
public synchronized Response processForgetTransaction(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
broker.forgetTransaction(context, info.getTransactionId());
return null;
}
synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception {
public synchronized Response processRecoverTransactions(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
@ -475,7 +477,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
synchronized public Response processAddDestination(DestinationInfo info) throws Exception {
public synchronized Response processAddDestination(DestinationInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
broker.addDestinationInfo(cs.getContext(), info);
if (info.getDestination().isTemporary()) {
@ -484,7 +486,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
synchronized public Response processRemoveDestination(DestinationInfo info) throws Exception {
public synchronized Response processRemoveDestination(DestinationInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
broker.removeDestinationInfo(cs.getContext(), info);
if (info.getDestination().isTemporary()) {
@ -493,7 +495,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
synchronized public Response processAddProducer(ProducerInfo info) throws Exception {
public synchronized Response processAddProducer(ProducerInfo info) throws Exception {
SessionId sessionId = info.getProducerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
@ -512,7 +514,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
synchronized public Response processRemoveProducer(ProducerId id) throws Exception {
public synchronized Response processRemoveProducer(ProducerId id) throws Exception {
SessionId sessionId = id.getParentId();
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
@ -527,7 +529,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
synchronized public Response processAddConsumer(ConsumerInfo info) throws Exception {
public synchronized Response processAddConsumer(ConsumerInfo info) throws Exception {
SessionId sessionId = info.getConsumerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
@ -546,7 +548,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
synchronized public Response processRemoveConsumer(ConsumerId id) throws Exception {
public synchronized Response processRemoveConsumer(ConsumerId id) throws Exception {
SessionId sessionId = id.getParentId();
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
@ -561,7 +563,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
synchronized public Response processAddSession(SessionInfo info) throws Exception {
public synchronized Response processAddSession(SessionInfo info) throws Exception {
ConnectionId connectionId = info.getSessionId().getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
// Avoid replaying dup commands
@ -576,7 +578,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
synchronized public Response processRemoveSession(SessionId id) throws Exception {
public synchronized Response processRemoveSession(SessionId id) throws Exception {
ConnectionId connectionId = id.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState session = cs.getSessionState(id);
@ -665,7 +667,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
synchronized public Response processRemoveConnection(ConnectionId id) {
public synchronized Response processRemoveConnection(ConnectionId id) {
TransportConnectionState cs = lookupConnectionState(id);
// Don't allow things to be added to the connection state while we are
// shutting down.
@ -1061,7 +1063,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return starting;
}
synchronized protected void setStarting(boolean starting) {
protected synchronized void setStarting(boolean starting) {
this.starting = starting;
}

View File

@ -32,7 +32,7 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.5 $
*/
public class TransportStatusDetector implements Service, Runnable {
private static final Log log = LogFactory.getLog(TransportStatusDetector.class);
private static final Log LOG = LogFactory.getLog(TransportStatusDetector.class);
private TransportConnector connector;
private Set collectionCandidates = new CopyOnWriteArraySet();
private AtomicBoolean started = new AtomicBoolean(false);
@ -86,11 +86,11 @@ public class TransportStatusDetector implements Service, Runnable {
}
protected void doCollection(TransportConnection tc) {
log.warn("found a blocked client - stopping: " + tc);
LOG.warn("found a blocked client - stopping: " + tc);
try {
tc.stop();
} catch (Exception e) {
log.error("Error stopping " + tc, e);
LOG.error("Error stopping " + tc, e);
}
}
@ -101,7 +101,7 @@ public class TransportStatusDetector implements Service, Runnable {
doSweep();
Thread.sleep(sweepInterval);
} catch (Throwable e) {
log.error("failed to complete a sweep for blocked clients", e);
LOG.error("failed to complete a sweep for blocked clients", e);
}
}
}

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import java.util.concurrent.atomic.AtomicBoolean;
@ -51,7 +53,7 @@ import org.apache.commons.logging.LogFactory;
*/
public class MasterBroker extends InsertableMutableBrokerFilter {
private static final Log log = LogFactory.getLog(MasterBroker.class);
private static final Log LOG = LogFactory.getLog(MasterBroker.class);
private Transport slave;
private AtomicBoolean started = new AtomicBoolean(false);
@ -86,7 +88,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter {
}
}
} catch (Exception e) {
log.error("Failed to get Connections", e);
LOG.error("Failed to get Connections", e);
}
}
@ -344,7 +346,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter {
try {
slave.oneway(command);
} catch (Throwable e) {
log.error("Slave Failed", e);
LOG.error("Slave Failed", e);
stopProcessing();
}
}
@ -354,10 +356,10 @@ public class MasterBroker extends InsertableMutableBrokerFilter {
Response response = (Response)slave.request(command);
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
log.error("Slave Failed", er.getException());
LOG.error("Slave Failed", er.getException());
}
} catch (Throwable e) {
log.error("Slave Failed", e);
LOG.error("Slave Failed", e);
}
}
}

View File

@ -55,7 +55,7 @@ import org.apache.commons.logging.LogFactory;
*/
public class MasterConnector implements Service, BrokerServiceAware {
private static final Log log = LogFactory.getLog(MasterConnector.class);
private static final Log LOG = LogFactory.getLog(MasterConnector.class);
private BrokerService broker;
private URI remoteURI;
private URI localURI;
@ -104,7 +104,7 @@ public class MasterConnector implements Service, BrokerServiceAware {
}
localBroker = TransportFactory.connect(localURI);
remoteBroker = TransportFactory.connect(remoteURI);
log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established.");
LOG.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established.");
localBroker.setTransportListener(new DefaultTransportListener() {
public void onCommand(Object command) {
@ -141,7 +141,7 @@ public class MasterConnector implements Service, BrokerServiceAware {
startBridge();
} catch (Exception e) {
masterActive.set(false);
log.error("Failed to start network bridge: " + e, e);
LOG.error("Failed to start network bridge: " + e, e);
}
}
};
@ -175,7 +175,7 @@ public class MasterConnector implements Service, BrokerServiceAware {
brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
brokerInfo.setSlaveBroker(true);
remoteBroker.oneway(brokerInfo);
log.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established.");
LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established.");
}
public void stop() throws Exception {
@ -192,7 +192,7 @@ public class MasterConnector implements Service, BrokerServiceAware {
remoteBroker.oneway(new ShutdownInfo());
localBroker.oneway(new ShutdownInfo());
} catch (IOException e) {
log.debug("Caught exception stopping", e);
LOG.debug("Caught exception stopping", e);
} finally {
ServiceStopper ss = new ServiceStopper();
ss.stop(localBroker);
@ -202,7 +202,7 @@ public class MasterConnector implements Service, BrokerServiceAware {
}
protected void serviceRemoteException(IOException error) {
log.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
shutDown();
}
@ -213,7 +213,7 @@ public class MasterConnector implements Service, BrokerServiceAware {
command = md.getMessage();
}
if (command.getDataStructureType() == CommandTypes.SHUTDOWN_INFO) {
log.warn("The Master has shutdown");
LOG.warn("The Master has shutdown");
shutDown();
} else {
boolean responseRequired = command.isResponseRequired();
@ -232,7 +232,7 @@ public class MasterConnector implements Service, BrokerServiceAware {
}
protected void serviceLocalException(Throwable error) {
log.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
ServiceSupport.dispose(this);
}

View File

@ -22,85 +22,105 @@ import org.apache.activemq.Service;
public interface BrokerViewMBean extends Service {
/**
* @return The unique id of the broker.
*/
public abstract String getBrokerId();
/**
* @return The unique id of the broker.
*/
String getBrokerId();
/**
* The Broker will fush it's caches so that the garbage
* collector can recalaim more memory.
* The Broker will fush it's caches so that the garbage collector can
* recalaim more memory.
*
* @throws Exception
*/
public void gc() throws Exception;
public void resetStatistics();
public void enableStatistics();
public void disableStatistics();
public boolean isStatisticsEnabled();
public long getTotalEnqueueCount();
public long getTotalDequeueCount();
public long getTotalConsumerCount();
public long getTotalMessageCount();
public int getMemoryPercentageUsed();
public long getMemoryLimit();
public void setMemoryLimit(long limit);
void gc() throws Exception;
void resetStatistics();
void enableStatistics();
void disableStatistics();
boolean isStatisticsEnabled();
long getTotalEnqueueCount();
long getTotalDequeueCount();
long getTotalConsumerCount();
long getTotalMessageCount();
int getMemoryPercentageUsed();
long getMemoryLimit();
void setMemoryLimit(long limit);
/**
* Shuts down the JVM.
* @param exitCode the exit code that will be reported by the JVM process when it exits.
*
* @param exitCode the exit code that will be reported by the JVM process
* when it exits.
*/
public void terminateJVM(int exitCode);
void terminateJVM(int exitCode);
/**
* Stop the broker and all it's components.
*/
public void stop() throws Exception;
public ObjectName[] getTopics();
public ObjectName[] getQueues();
public ObjectName[] getTemporaryTopics();
public ObjectName[] getTemporaryQueues();
public ObjectName[] getTopicSubscribers();
public ObjectName[] getDurableTopicSubscribers();
public ObjectName[] getInactiveDurableTopicSubscribers();
public ObjectName[] getQueueSubscribers();
public ObjectName[] getTemporaryTopicSubscribers();
public ObjectName[] getTemporaryQueueSubscribers();
/**
void stop() throws Exception;
ObjectName[] getTopics();
ObjectName[] getQueues();
ObjectName[] getTemporaryTopics();
ObjectName[] getTemporaryQueues();
ObjectName[] getTopicSubscribers();
ObjectName[] getDurableTopicSubscribers();
ObjectName[] getInactiveDurableTopicSubscribers();
ObjectName[] getQueueSubscribers();
ObjectName[] getTemporaryTopicSubscribers();
ObjectName[] getTemporaryQueueSubscribers();
/**
* Adds a Topic destination to the broker.
*
* @param name The name of the Topic
* @throws Exception
*/
public void addTopic(String name) throws Exception;
void addTopic(String name) throws Exception;
/**
* Adds a Queue destination to the broker.
*
* @param name The name of the Queue
* @throws Exception
*/
public void addQueue(String name) throws Exception;
void addQueue(String name) throws Exception;
/**
/**
* Removes a Topic destination from the broker.
*
* @param name The name of the Topic
* @throws Exception
*/
public void removeTopic(String name) throws Exception;
void removeTopic(String name) throws Exception;
/**
* Removes a Queue destination from the broker.
*
* @param name The name of the Queue
* @throws Exception
*/
public void removeQueue(String name) throws Exception;
void removeQueue(String name) throws Exception;
/**
* Creates a new durable topic subscriber
@ -109,10 +129,9 @@ public interface BrokerViewMBean extends Service {
* @param subscriberName the durable subscriber name
* @param topicName the name of the topic to subscribe to
* @param selector a selector or null
*
* @return the object name of the MBean registered in JMX
*/
public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception;
ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception;
/**
* Destroys a durable subscriber
@ -120,6 +139,6 @@ public interface BrokerViewMBean extends Service {
* @param clientId the JMS client ID
* @param subscriberName the durable subscriber name
*/
public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception;
void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception;
}

View File

@ -22,52 +22,52 @@ public interface ConnectionViewMBean extends Service {
/**
* @return true if the Connection is slow
*/
public boolean isSlow();
boolean isSlow();
/**
* @return if after being marked, the Connection is still writing
*/
public boolean isBlocked();
boolean isBlocked();
/**
* @return true if the Connection is connected
*/
public boolean isConnected();
boolean isConnected();
/**
* @return true if the Connection is active
*/
public boolean isActive();
boolean isActive();
/**
* Returns the number of messages to be dispatched to this connection
*/
public int getDispatchQueueSize();
int getDispatchQueueSize();
/**
* Resets the statistics
*/
public void resetStatistics();
void resetStatistics();
/**
* Returns the number of messages enqueued on this connection
*
* @return the number of messages enqueued on this connection
*/
public long getEnqueueCount();
long getEnqueueCount();
/**
* Returns the number of messages dequeued on this connection
*
* @return the number of messages dequeued on this connection
*/
public long getDequeueCount();
long getDequeueCount();
/**
* Returns the source address for this connection
*
* @return the souce address for this connection
*/
public String getRemoteAddress();
String getRemoteAddress();
}

View File

@ -46,37 +46,36 @@ public class ConnectorView implements ConnectorViewMBean {
public BrokerInfo getBrokerInfo() {
return connector.getBrokerInfo();
}
/**
* Resets the statistics
*/
public void resetStatistics() {
connector.getStatistics().reset();
}
/**
* enable statistics gathering
*/
*/
public void enableStatistics() {
connector.getStatistics().setEnabled(true);
}
connector.getStatistics().setEnabled(true);
}
/**
* disable statistics gathering
*/
*/
public void disableStatistics() {
connector.getStatistics().setEnabled(false);
}
connector.getStatistics().setEnabled(false);
}
/**
* Returns true if statistics is enabled
*
* @return true if statistics is enabled
*/
*/
public boolean isStatisticsEnabled() {
return connector.getStatistics().isEnabled();
}
return connector.getStatistics().isEnabled();
}
/**
* Returns the number of messages enqueued on this connector
@ -85,7 +84,7 @@ public class ConnectorView implements ConnectorViewMBean {
*/
public long getEnqueueCount() {
return connector.getStatistics().getEnqueues().getCount();
}
/**

View File

@ -23,37 +23,37 @@ public interface ConnectorViewMBean extends Service {
/**
* Resets the statistics
*/
public void resetStatistics();
void resetStatistics();
/**
* enable statistics gathering
*/
public void enableStatistics();
void enableStatistics();
/**
* disable statistics gathering
*/
public void disableStatistics();
void disableStatistics();
/**
* Returns true if statistics is enabled
*
* @return true if statistics is enabled
*/
public boolean isStatisticsEnabled();
boolean isStatisticsEnabled();
/**
* Returns the number of messages enqueued on this connector
*
* @return the number of messages enqueued on this connector
*/
public long getEnqueueCount();
long getEnqueueCount();
/**
* Returns the number of messages dequeued on this connector
*
* @return the number of messages dequeued on this connector
*/
public long getDequeueCount();
long getDequeueCount();
}

View File

@ -48,7 +48,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class DestinationView implements DestinationViewMBean {
private static final Log log = LogFactory.getLog(DestinationViewMBean.class);
private static final Log LOG = LogFactory.getLog(DestinationViewMBean.class);
protected final Destination destination;
protected final ManagedRegionBroker broker;
@ -147,7 +147,7 @@ public class DestinationView implements DestinationViewMBean {
}
} catch (Throwable e) {
log.warn("exception browsing destination", e);
LOG.warn("exception browsing destination", e);
}
}
@ -188,7 +188,7 @@ public class DestinationView implements DestinationViewMBean {
}
} catch (Throwable e) {
log.warn("exception browsing destination", e);
LOG.warn("exception browsing destination", e);
}
}
return answer;
@ -224,7 +224,7 @@ public class DestinationView implements DestinationViewMBean {
}
}
} catch (Throwable e) {
log.warn("exception browsing destination", e);
LOG.warn("exception browsing destination", e);
}
}

View File

@ -24,121 +24,131 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
public interface DestinationViewMBean {
/**
* Returns the name of this destination
*/
public String getName();
/**
* Resets the managment counters.
*/
public void resetStatistics();
String getName();
/**
* Resets the managment counters.
*/
void resetStatistics();
/**
* Returns the number of messages that have been sent to the destination.
*
*
* @return The number of messages that have been sent to the destination.
*/
public long getEnqueueCount();
long getEnqueueCount();
/**
* Returns the number of messages that have been delivered (potentially not acknowledged) to consumers.
*
* @return The number of messages that have been delivered (potentially not acknowledged) to consumers.
* Returns the number of messages that have been delivered (potentially not
* acknowledged) to consumers.
*
* @return The number of messages that have been delivered (potentially not
* acknowledged) to consumers.
*/
public long getDispatchCount();
long getDispatchCount();
/**
* Returns the number of messages that have been acknowledged from the destination.
*
* @return The number of messages that have been acknowledged from the destination.
* Returns the number of messages that have been acknowledged from the
* destination.
*
* @return The number of messages that have been acknowledged from the
* destination.
*/
public long getDequeueCount();
long getDequeueCount();
/**
* Returns the number of consumers subscribed this destination.
*
*
* @return The number of consumers subscribed this destination.
*/
public long getConsumerCount();
long getConsumerCount();
/**
* Returns the number of messages in this destination which are yet to be consumed
*
* @return Returns the number of messages in this destination which are yet to be consumed
* Returns the number of messages in this destination which are yet to be
* consumed
*
* @return Returns the number of messages in this destination which are yet
* to be consumed
*/
public long getQueueSize();
/**
* @return An array of all the messages in the destination's queue.
*/
public CompositeData[] browse() throws OpenDataException;
/**
* @return A list of all the messages in the destination's queue.
*/
public TabularData browseAsTable() throws OpenDataException;
long getQueueSize();
/**
* @return An array of all the messages in the destination's queue.
* @throws InvalidSelectorException
*/
public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException;
CompositeData[] browse() throws OpenDataException;
/**
* @return A list of all the messages in the destination's queue.
* @throws InvalidSelectorException
*/
public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException;
TabularData browseAsTable() throws OpenDataException;
/**
* @return An array of all the messages in the destination's queue.
* @throws InvalidSelectorException
*/
CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException;
/**
* @return A list of all the messages in the destination's queue.
* @throws InvalidSelectorException
*/
TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException;
/**
* Sends a TextMesage to the destination.
*
* @param body the text to send
* @return the message id of the message sent.
* @throws Exception
*/
public String sendTextMessage(String body) throws Exception;
String sendTextMessage(String body) throws Exception;
/**
* Sends a TextMesage to the destination.
* @param headers the message headers and properties to set. Can only container Strings maped to primitive types.
*
* @param headers the message headers and properties to set. Can only
* container Strings maped to primitive types.
* @param body the text to send
* @return the message id of the message sent.
* @throws Exception
*/
public String sendTextMessage(Map headers, String body) throws Exception;
String sendTextMessage(Map headers, String body) throws Exception;
public int getMemoryPercentageUsed();
public long getMemoryLimit();
public void setMemoryLimit(long limit);
int getMemoryPercentageUsed();
long getMemoryLimit();
void setMemoryLimit(long limit);
/**
* Browses the current destination returning a list of messages
*/
public List browseMessages() throws InvalidSelectorException;
List browseMessages() throws InvalidSelectorException;
/**
* Browses the current destination with the given selector returning a list of messages
* Browses the current destination with the given selector returning a list
* of messages
*/
public List browseMessages(String selector) throws InvalidSelectorException;
List browseMessages(String selector) throws InvalidSelectorException;
/**
* @return longest time a message is held by a destination
*/
public long getMaxEnqueueTime();
long getMaxEnqueueTime();
/**
* @return shortest time a message is held by a destination
*/
public long getMinEnqueueTime();
long getMinEnqueueTime();
/**
* @return average time a message is held by a destination
*/
public double getAverageEnqueueTime();
double getAverageEnqueueTime();
}

View File

@ -27,7 +27,7 @@ public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean {
/**
* @return name of the durable subscription name
*/
public String getSubscriptionName();
String getSubscriptionName();
/**
* Browse messages for this durable subscriber
@ -35,7 +35,7 @@ public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean {
* @return messages
* @throws OpenDataException
*/
public CompositeData[] browse() throws OpenDataException;
CompositeData[] browse() throws OpenDataException;
/**
* Browse messages for this durable subscriber
@ -43,11 +43,11 @@ public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean {
* @return messages
* @throws OpenDataException
*/
public TabularData browseAsTable() throws OpenDataException;
TabularData browseAsTable() throws OpenDataException;
/**
* Destroys the durable subscription so that messages will no longer be
* stored for this subscription
*/
public void destroy() throws Exception;
void destroy() throws Exception;
}

View File

@ -73,7 +73,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class ManagedRegionBroker extends RegionBroker {
private static final Log log = LogFactory.getLog(ManagedRegionBroker.class);
private static final Log LOG = LogFactory.getLog(ManagedRegionBroker.class);
private final MBeanServer mbeanServer;
private final ObjectName brokerObjectName;
private final Map topics = new ConcurrentHashMap();
@ -113,7 +113,7 @@ public class ManagedRegionBroker extends RegionBroker {
try {
mbeanServer.unregisterMBean(name);
} catch (InstanceNotFoundException e) {
log.warn("The MBean: " + name + " is no longer registered with JMX");
LOG.warn("The MBean: " + name + " is no longer registered with JMX");
} catch (Exception e) {
stopper.onException(this, e);
}
@ -148,13 +148,13 @@ public class ManagedRegionBroker extends RegionBroker {
view = new TopicView(this, (Topic)destination);
} else {
view = null;
log.warn("JMX View is not supported for custom destination: " + destination);
LOG.warn("JMX View is not supported for custom destination: " + destination);
}
if (view != null) {
registerDestination(objectName, destName, view);
}
} catch (Exception e) {
log.error("Failed to register destination " + destName, e);
LOG.error("Failed to register destination " + destName, e);
}
}
@ -163,7 +163,7 @@ public class ManagedRegionBroker extends RegionBroker {
ObjectName objectName = createObjectName(destName);
unregisterDestination(objectName);
} catch (Exception e) {
log.error("Failed to unregister " + destName, e);
LOG.error("Failed to unregister " + destName, e);
}
}
@ -205,7 +205,7 @@ public class ManagedRegionBroker extends RegionBroker {
subscriptionMap.put(sub, objectName);
return objectName;
} catch (Exception e) {
log.error("Failed to register subscription " + sub, e);
LOG.error("Failed to register subscription " + sub, e);
return null;
}
}
@ -216,7 +216,7 @@ public class ManagedRegionBroker extends RegionBroker {
try {
unregisterSubscription(name);
} catch (Exception e) {
log.error("Failed to unregister subscription " + sub, e);
LOG.error("Failed to unregister subscription " + sub, e);
}
}
}
@ -239,8 +239,8 @@ public class ManagedRegionBroker extends RegionBroker {
mbeanServer.registerMBean(view, key);
registeredMBeans.add(key);
} catch (Throwable e) {
log.warn("Failed to register MBean: " + key);
log.debug("Failure reason: " + e, e);
LOG.warn("Failed to register MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
@ -253,8 +253,8 @@ public class ManagedRegionBroker extends RegionBroker {
try {
mbeanServer.unregisterMBean(key);
} catch (Throwable e) {
log.warn("Failed to unregister MBean: " + key);
log.debug("Failure reason: " + e, e);
LOG.warn("Failed to unregister MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
}
@ -282,7 +282,7 @@ public class ManagedRegionBroker extends RegionBroker {
mbeanServer.unregisterMBean(inactiveName);
}
} catch (Throwable e) {
log.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
}
} else {
topicSubscribers.put(key, view);
@ -294,8 +294,8 @@ public class ManagedRegionBroker extends RegionBroker {
mbeanServer.registerMBean(view, key);
registeredMBeans.add(key);
} catch (Throwable e) {
log.warn("Failed to register MBean: " + key);
log.debug("Failure reason: " + e, e);
LOG.warn("Failed to register MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
@ -310,8 +310,8 @@ public class ManagedRegionBroker extends RegionBroker {
try {
mbeanServer.unregisterMBean(key);
} catch (Throwable e) {
log.warn("Failed to unregister MBean: " + key);
log.debug("Failure reason: " + e, e);
LOG.warn("Failed to unregister MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
@ -337,7 +337,7 @@ public class ManagedRegionBroker extends RegionBroker {
if (infos != null) {
for (int i = 0; i < infos.length; i++) {
SubscriptionInfo info = infos[i];
log.debug("Restoring durable subscription: " + info);
LOG.debug("Restoring durable subscription: " + info);
SubscriptionKey key = new SubscriptionKey(info);
subscriptions.put(key, info);
}
@ -364,14 +364,14 @@ public class ManagedRegionBroker extends RegionBroker {
mbeanServer.registerMBean(view, objectName);
registeredMBeans.add(objectName);
} catch (Throwable e) {
log.warn("Failed to register MBean: " + key);
log.debug("Failure reason: " + e, e);
LOG.warn("Failed to register MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
inactiveDurableTopicSubscribers.put(objectName, view);
subscriptionKeys.put(key, objectName);
} catch (Exception e) {
log.error("Failed to register subscription " + info, e);
LOG.error("Failed to register subscription " + info, e);
}
}
@ -382,7 +382,7 @@ public class ManagedRegionBroker extends RegionBroker {
try {
c[i] = OpenTypeSupport.convert((Message)messages.get(i));
} catch (Throwable e) {
log.error("failed to browse : " + view, e);
LOG.error("failed to browse : " + view, e);
}
}
return c;
@ -428,7 +428,7 @@ public class ManagedRegionBroker extends RegionBroker {
}
});
} catch (Throwable e) {
log.error("Failed to browse messages for Subscription " + view, e);
LOG.error("Failed to browse messages for Subscription " + view, e);
}
return result;

View File

@ -40,7 +40,7 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.1 $
*/
public class ManagedTransportConnection extends TransportConnection {
private static final Log log = LogFactory.getLog(ManagedTransportConnection.class);
private static final Log LOG = LogFactory.getLog(ManagedTransportConnection.class);
private final MBeanServer server;
private final ObjectName connectorName;
@ -101,8 +101,8 @@ public class ManagedTransportConnection extends TransportConnection {
try {
server.registerMBean(mbean, name);
} catch (Throwable e) {
log.warn("Failed to register MBean: " + name);
log.debug("Failure reason: " + e, e);
LOG.warn("Failed to register MBean: " + name);
LOG.debug("Failure reason: " + e, e);
}
}
}
@ -112,8 +112,8 @@ public class ManagedTransportConnection extends TransportConnection {
try {
server.unregisterMBean(name);
} catch (Throwable e) {
log.warn("Failed to unregister mbean: " + name);
log.debug("Failure reason: " + e, e);
LOG.warn("Failed to unregister mbean: " + name);
LOG.debug("Failure reason: " + e, e);
}
}
}

View File

@ -1,12 +1,12 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@ -1,12 +1,12 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -19,12 +19,17 @@ package org.apache.activemq.broker.jmx;
import org.apache.activemq.Service;
public interface NetworkBridgeViewMBean extends Service {
public String getLocalAddress();
public String getRemoteAddress();
public String getRemoteBrokerName();
public String getLocalBrokerName();
public long getEnqueueCounter();
public long getDequeueCounter();
}
String getLocalAddress();
String getRemoteAddress();
String getRemoteBrokerName();
String getLocalBrokerName();
long getEnqueueCounter();
long getDequeueCounter();
}

View File

@ -20,42 +20,42 @@ import org.apache.activemq.Service;
public interface NetworkConnectorViewMBean extends Service {
public String getName();
String getName();
public int getNetworkTTL();
int getNetworkTTL();
public int getPrefetchSize();
int getPrefetchSize();
public String getUserName();
String getUserName();
public boolean isBridgeTempDestinations();
boolean isBridgeTempDestinations();
public boolean isConduitSubscriptions();
boolean isConduitSubscriptions();
public boolean isDecreaseNetworkConsumerPriority();
boolean isDecreaseNetworkConsumerPriority();
public boolean isDispatchAsync();
boolean isDispatchAsync();
public boolean isDynamicOnly();
boolean isDynamicOnly();
public void setBridgeTempDestinations(boolean bridgeTempDestinations);
void setBridgeTempDestinations(boolean bridgeTempDestinations);
public void setConduitSubscriptions(boolean conduitSubscriptions);
void setConduitSubscriptions(boolean conduitSubscriptions);
public void setDispatchAsync(boolean dispatchAsync);
void setDispatchAsync(boolean dispatchAsync);
public void setDynamicOnly(boolean dynamicOnly);
void setDynamicOnly(boolean dynamicOnly);
public void setNetworkTTL(int networkTTL);
void setNetworkTTL(int networkTTL);
public void setPassword(String password);
void setPassword(String password);
public void setPrefetchSize(int prefetchSize);
void setPrefetchSize(int prefetchSize);
public void setUserName(String userName);
void setUserName(String userName);
public String getPassword();
String getPassword();
public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority);
void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority);
}

View File

@ -48,7 +48,7 @@ public class OpenTypeSupport {
Map getFields(Object o) throws OpenDataException;
}
private static final HashMap openTypeFactories = new HashMap();
private static final HashMap OPEN_TYPE_FACTORIES = new HashMap();
abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
@ -75,7 +75,7 @@ public class OpenTypeSupport {
return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes);
}
abstract protected String getTypeName();
protected abstract String getTypeName();
protected void addItem(String name, String description, OpenType type) {
itemNamesList.add(name);
@ -257,16 +257,16 @@ public class OpenTypeSupport {
}
static {
openTypeFactories.put(ActiveMQMessage.class, new MessageOpenTypeFactory());
openTypeFactories.put(ActiveMQBytesMessage.class, new ByteMessageOpenTypeFactory());
openTypeFactories.put(ActiveMQMapMessage.class, new MapMessageOpenTypeFactory());
openTypeFactories.put(ActiveMQObjectMessage.class, new ObjectMessageOpenTypeFactory());
openTypeFactories.put(ActiveMQStreamMessage.class, new StreamMessageOpenTypeFactory());
openTypeFactories.put(ActiveMQTextMessage.class, new TextMessageOpenTypeFactory());
OPEN_TYPE_FACTORIES.put(ActiveMQMessage.class, new MessageOpenTypeFactory());
OPEN_TYPE_FACTORIES.put(ActiveMQBytesMessage.class, new ByteMessageOpenTypeFactory());
OPEN_TYPE_FACTORIES.put(ActiveMQMapMessage.class, new MapMessageOpenTypeFactory());
OPEN_TYPE_FACTORIES.put(ActiveMQObjectMessage.class, new ObjectMessageOpenTypeFactory());
OPEN_TYPE_FACTORIES.put(ActiveMQStreamMessage.class, new StreamMessageOpenTypeFactory());
OPEN_TYPE_FACTORIES.put(ActiveMQTextMessage.class, new TextMessageOpenTypeFactory());
}
public static OpenTypeFactory getFactory(Class clazz) throws OpenDataException {
return (OpenTypeFactory)openTypeFactories.get(clazz);
return (OpenTypeFactory)OPEN_TYPE_FACTORIES.get(clazz);
}
public static CompositeData convert(Message message) throws OpenDataException {

View File

@ -19,19 +19,17 @@ package org.apache.activemq.broker.jmx;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
public interface QueueViewMBean extends DestinationViewMBean {
/**
/**
* Retrieve a message from the destination's queue.
*
* @param messageId
* the message id of the message to retrieve
* @param messageId the message id of the message to retrieve
* @return A CompositeData object which is a JMX version of the messages
* @throws OpenDataException
*/
public CompositeData getMessage(String messageId) throws OpenDataException;
CompositeData getMessage(String messageId) throws OpenDataException;
/**
* Removes a message from the queue. If the message has already been
* dispatched to another consumer, the message cannot be deleted and this
@ -39,31 +37,32 @@ public interface QueueViewMBean extends DestinationViewMBean {
*
* @param messageId
* @return true if the message was found and could be successfully deleted.
* @throws Exception
* @throws Exception
*/
public boolean removeMessage(String messageId) throws Exception;
boolean removeMessage(String messageId) throws Exception;
/**
* Removes the messages matching the given selector
*
* @return the number of messages removed
*/
public int removeMatchingMessages(String selector) throws Exception;
int removeMatchingMessages(String selector) throws Exception;
/**
* Removes the messages matching the given selector up to the maximum number of matched messages
* Removes the messages matching the given selector up to the maximum number
* of matched messages
*
* @return the number of messages removed
*/
public int removeMatchingMessages(String selector, int maximumMessages) throws Exception;
int removeMatchingMessages(String selector, int maximumMessages) throws Exception;
/**
* Removes all of the messages in the queue.
* @throws Exception
*
* @throws Exception
*/
public void purge() throws Exception;
void purge() throws Exception;
/**
* Copies a given message to another destination.
*
@ -73,21 +72,22 @@ public interface QueueViewMBean extends DestinationViewMBean {
* other destination.
* @throws Exception
*/
public boolean copyMessageTo(String messageId, String destinationName) throws Exception;
boolean copyMessageTo(String messageId, String destinationName) throws Exception;
/**
* Copies the messages matching the given selector
*
* @return the number of messages copied
*/
public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception;
int copyMatchingMessagesTo(String selector, String destinationName) throws Exception;
/**
* Copies the messages matching the given selector up to the maximum number of matched messages
* Copies the messages matching the given selector up to the maximum number
* of matched messages
*
* @return the number of messages copied
*/
public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception;
int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception;
/**
* Moves the message to another destination.
@ -98,18 +98,19 @@ public interface QueueViewMBean extends DestinationViewMBean {
* other destination.
* @throws Exception
*/
public boolean moveMessageTo(String messageId, String destinationName) throws Exception;
boolean moveMessageTo(String messageId, String destinationName) throws Exception;
/**
* Moves the messages matching the given selector
*
* @return the number of messages removed
*/
public int moveMatchingMessagesTo(String selector, String destinationName) throws Exception;
int moveMatchingMessagesTo(String selector, String destinationName) throws Exception;
/**
* Moves the messages matching the given selector up to the maximum number of matched messages
* Moves the messages matching the given selector up to the maximum number
* of matched messages
*/
public int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception;
int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception;
}

View File

@ -26,68 +26,68 @@ public interface SubscriptionViewMBean {
/**
* @return the clientId of the Connection the Subscription is on
*/
public String getClientId();
String getClientId();
/**
* @return the id of the Connection the Subscription is on
*/
public String getConnectionId();
String getConnectionId();
/**
* @return the id of the Session the subscription is on
*/
public long getSessionId();
long getSessionId();
/**
* @return the id of the Subscription
*/
public long getSubcriptionId();
long getSubcriptionId();
/**
* @return the destination name
*/
public String getDestinationName();
String getDestinationName();
/**
* @return the JMS selector on the current subscription
*/
public String getSelector();
String getSelector();
/**
* Attempts to change the current active selector on the subscription. This
* operation is not supported for persistent topics.
*/
public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException;
void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException;
/**
* @return true if the destination is a Queue
*/
public boolean isDestinationQueue();
boolean isDestinationQueue();
/**
* @return true of the destination is a Topic
*/
public boolean isDestinationTopic();
boolean isDestinationTopic();
/**
* @return true if the destination is temporary
*/
public boolean isDestinationTemporary();
boolean isDestinationTemporary();
/**
* @return true if the subscriber is active
*/
public boolean isActive();
boolean isActive();
/**
* @return number of messages pending delivery
*/
public int getPendingQueueSize();
int getPendingQueueSize();
/**
* @return number of messages dispatched
*/
public int getDispatchedQueueSize();
int getDispatchedQueueSize();
/**
* @return number of messages that matched the subscription
@ -107,27 +107,27 @@ public interface SubscriptionViewMBean {
/**
* @return the prefetch that has been configured for this subscriber
*/
public int getPrefetchSize();
int getPrefetchSize();
/**
* @return whether or not the subscriber is retroactive or not
*/
public boolean isRetroactive();
boolean isRetroactive();
/**
* @return whether or not the subscriber is an exclusive consumer
*/
public boolean isExclusive();
boolean isExclusive();
/**
* @return whether or not the subscriber is durable (persistent)
*/
public boolean isDurable();
boolean isDurable();
/**
* @return whether or not the subscriber ignores local messages
*/
public boolean isNoLocal();
boolean isNoLocal();
/**
* @return the maximum number of pending messages allowed in addition to the
@ -135,16 +135,16 @@ public interface SubscriptionViewMBean {
* perform eviction of messages for slow consumers on non-durable
* topics.
*/
public int getMaximumPendingMessageLimit();
int getMaximumPendingMessageLimit();
/**
* @return the consumer priority
*/
public byte getPriority();
byte getPriority();
/**
* @return the name of the consumer which is only used for durable
* consumers.
*/
public String getSubcriptionName();
String getSubcriptionName();
}

View File

@ -25,12 +25,12 @@ public interface TopicSubscriptionViewMBean extends SubscriptionViewMBean {
/**
* @return the number of messages discarded due to being a slow consumer
*/
public int getDiscardedCount();
int getDiscardedCount();
/**
* @return the maximun number of messages that can be pending.
*/
public int getMaximumPendingQueueSize();
public void setMaximumPendingQueueSize(int max);
int getMaximumPendingQueueSize();
void setMaximumPendingQueueSize(int max);
}

View File

@ -48,7 +48,7 @@ import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.14 $
*/
abstract public class AbstractRegion implements Region {
public abstract class AbstractRegion implements Region {
private static final Log LOG = LogFactory.getLog(AbstractRegion.class);

View File

@ -37,18 +37,18 @@ import org.apache.activemq.selector.SelectorParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
abstract public class AbstractSubscription implements Subscription {
public abstract class AbstractSubscription implements Subscription {
static private final Log log = LogFactory.getLog(AbstractSubscription.class);
private static final Log LOG = LogFactory.getLog(AbstractSubscription.class);
protected Broker broker;
protected ConnectionContext context;
protected ConsumerInfo info;
final protected DestinationFilter destinationFilter;
protected final DestinationFilter destinationFilter;
protected final CopyOnWriteArrayList destinations = new CopyOnWriteArrayList();
private BooleanExpression selectorExpression;
private ObjectName objectName;
final protected CopyOnWriteArrayList destinations = new CopyOnWriteArrayList();
public AbstractSubscription(Broker broker, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this.broker = broker;
@ -58,7 +58,7 @@ abstract public class AbstractSubscription implements Subscription {
this.selectorExpression = parseSelector(info);
}
static private BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
BooleanExpression rc = null;
if (info.getSelector() != null) {
rc = new SelectorParser().parse(info.getSelector());
@ -89,7 +89,7 @@ abstract public class AbstractSubscription implements Subscription {
try {
return (selectorExpression == null || selectorExpression.matches(context)) && this.context.isAllowedToConsume(node);
} catch (JMSException e) {
log.info("Selector failed to evaluate: " + e.getMessage(), e);
LOG.info("Selector failed to evaluate: " + e.getMessage(), e);
return false;
}
}

View File

@ -55,9 +55,9 @@ public interface Destination extends Service {
DeadLetterStrategy getDeadLetterStrategy();
public Message[] browse();
Message[] browse();
public String getName();
String getName();
public MessageStore getMessageStore();
MessageStore getMessageStore();
}

View File

@ -35,21 +35,21 @@ public abstract class DestinationFactory {
/**
* Create destination implementation.
*/
abstract public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception;
public abstract Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception;
/**
* Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
* objects that the persistence store is aware exist.
*/
abstract public Set getDestinations();
public abstract Set getDestinations();
/**
* Lists all the durable subscirptions for a given destination.
*/
abstract public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException;
public abstract SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException;
abstract public long getLastMessageBrokerSequenceId() throws IOException;
public abstract long getLastMessageBrokerSequenceId() throws IOException;
abstract public void setRegionBroker(RegionBroker regionBroker);
public abstract void setRegionBroker(RegionBroker regionBroker);
}

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
@ -49,7 +51,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
}
synchronized public boolean isActive() {
public synchronized boolean isActive() {
return active;
}
@ -57,7 +59,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
return !active || super.isFull();
}
synchronized public void gc() {
public synchronized void gc() {
}
public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
@ -101,7 +103,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
}
synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {
public synchronized void deactivate(boolean keepDurableSubsActive) throws Exception {
active = false;
this.usageManager.removeUsageListener(this);
synchronized (pending) {

View File

@ -78,15 +78,15 @@ public class IndirectMessageReference implements QueueMessageReference {
this.cachedSize = message.getSize();
}
synchronized public Message getMessageHardRef() {
public synchronized Message getMessageHardRef() {
return message;
}
synchronized public int getReferenceCount() {
public synchronized int getReferenceCount() {
return referenceCount;
}
synchronized public int incrementReferenceCount() {
public synchronized int incrementReferenceCount() {
int rc = ++referenceCount;
if (persistent && rc == 1 && message == null) {
@ -105,7 +105,7 @@ public class IndirectMessageReference implements QueueMessageReference {
return rc;
}
synchronized public int decrementReferenceCount() {
public synchronized int decrementReferenceCount() {
int rc = --referenceCount;
if (persistent && rc == 0 && message != null) {
message.decrementReferenceCount();
@ -114,7 +114,7 @@ public class IndirectMessageReference implements QueueMessageReference {
return rc;
}
synchronized public Message getMessage() {
public synchronized Message getMessage() {
return message;
}
@ -122,15 +122,15 @@ public class IndirectMessageReference implements QueueMessageReference {
return "Message " + messageId + " dropped=" + dropped + " locked=" + (lockOwner != null);
}
synchronized public void incrementRedeliveryCounter() {
public synchronized void incrementRedeliveryCounter() {
this.redeliveryCounter++;
}
synchronized public boolean isDropped() {
public synchronized boolean isDropped() {
return dropped;
}
synchronized public void drop() {
public synchronized void drop() {
dropped = true;
lockOwner = null;
if (!persistent && message != null) {
@ -150,15 +150,15 @@ public class IndirectMessageReference implements QueueMessageReference {
}
}
synchronized public void unlock() {
public synchronized void unlock() {
lockOwner = null;
}
synchronized public LockOwner getLockOwner() {
public synchronized LockOwner getLockOwner() {
return lockOwner;
}
synchronized public int getRedeliveryCounter() {
public synchronized int getRedeliveryCounter() {
return redeliveryCounter;
}
@ -174,15 +174,15 @@ public class IndirectMessageReference implements QueueMessageReference {
return persistent;
}
synchronized public boolean isLocked() {
public synchronized boolean isLocked() {
return lockOwner != null;
}
synchronized public boolean isAcked() {
public synchronized boolean isAcked() {
return acked;
}
synchronized public void setAcked(boolean b) {
public synchronized void setAcked(boolean b) {
acked = b;
}

View File

@ -18,7 +18,7 @@ package org.apache.activemq.broker.region;
public interface LockOwner {
public static final LockOwner HIGH_PRIORITY_LOCK_OWNER = new LockOwner() {
LockOwner HIGH_PRIORITY_LOCK_OWNER = new LockOwner() {
public int getLockPriority() {
return Integer.MAX_VALUE;
}

View File

@ -31,29 +31,29 @@ import org.apache.activemq.command.MessageId;
*/
public interface MessageReference {
public MessageId getMessageId();
public Message getMessageHardRef();
public Message getMessage() throws IOException;
public boolean isPersistent();
MessageId getMessageId();
Message getMessageHardRef();
Message getMessage() throws IOException;
boolean isPersistent();
public Destination getRegionDestination();
Destination getRegionDestination();
public int getRedeliveryCounter();
public void incrementRedeliveryCounter();
int getRedeliveryCounter();
void incrementRedeliveryCounter();
public int getReferenceCount();
int getReferenceCount();
public int incrementReferenceCount();
public int decrementReferenceCount();
public ConsumerId getTargetConsumerId();
public int getSize();
public long getExpiration();
public String getGroupID();
public int getGroupSequence();
int incrementReferenceCount();
int decrementReferenceCount();
ConsumerId getTargetConsumerId();
int getSize();
long getExpiration();
String getGroupID();
int getGroupSequence();
/**
* Returns true if this message is expired
*/
public boolean isExpired();
boolean isExpired();
}

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -27,5 +27,5 @@ import org.apache.activemq.broker.ConnectionContext;
*/
public interface MessageReferenceFilter {
public boolean evaluate(ConnectionContext context, MessageReference messageReference) throws JMSException;
boolean evaluate(ConnectionContext context, MessageReference messageReference) throws JMSException;
}

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
@ -44,11 +46,11 @@ import org.apache.commons.logging.LogFactory;
*
* @version $Revision: 1.15 $
*/
abstract public class PrefetchSubscription extends AbstractSubscription {
public abstract class PrefetchSubscription extends AbstractSubscription {
static private final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
protected PendingMessageCursor pending;
final protected LinkedList dispatched = new LinkedList();
protected final LinkedList dispatched = new LinkedList();
protected int prefetchExtension;
protected long enqueueCounter;
protected long dispatchCounter;
@ -317,15 +319,15 @@ abstract public class PrefetchSubscription extends AbstractSubscription {
return dispatched.size();
}
synchronized public long getDequeueCounter() {
public synchronized long getDequeueCounter() {
return dequeueCounter;
}
synchronized public long getDispatchedCounter() {
public synchronized long getDispatchedCounter() {
return dispatchCounter;
}
synchronized public long getEnqueueCounter() {
public synchronized long getEnqueueCounter() {
return enqueueCounter;
}
@ -499,7 +501,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription {
* (another sub may have already dispatched it for example).
* @throws IOException
*/
abstract protected boolean canDispatch(MessageReference node) throws IOException;
protected abstract boolean canDispatch(MessageReference node) throws IOException;
/**
* Used during acknowledgment to remove the message.

View File

@ -24,19 +24,19 @@ package org.apache.activemq.broker.region;
*/
public interface QueueMessageReference extends MessageReference {
public static final QueueMessageReference NULL_MESSAGE = new NullMessageReference();
QueueMessageReference NULL_MESSAGE = new NullMessageReference();
public boolean isAcked();
boolean isAcked();
public void setAcked(boolean b);
void setAcked(boolean b);
public void drop();
void drop();
public boolean isDropped();
boolean isDropped();
public boolean lock(LockOwner subscription);
boolean lock(LockOwner subscription);
public void unlock();
void unlock();
public LockOwner getLockOwner();
LockOwner getLockOwner();
}

View File

@ -33,7 +33,7 @@ import org.apache.commons.logging.LogFactory;
public class QueueSubscription extends PrefetchSubscription implements LockOwner {
private static final Log log = LogFactory.getLog(QueueSubscription.class);
private static final Log LOG = LogFactory.getLog(QueueSubscription.class);
public QueueSubscription(Broker broker, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(broker, context, info);
@ -132,7 +132,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
try {
activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
} catch (JMSException e) {
log.warn("Failed to set boolean header: " + e, e);
LOG.warn("Failed to set boolean header: " + e, e);
}
}
}

View File

@ -50,7 +50,7 @@ public interface Region extends Service {
* @return TODO
* @throws Exception TODO
*/
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception;
Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception;
/**
* Used to destroy a destination.
@ -62,14 +62,14 @@ public interface Region extends Service {
* @param timeout the max amount of time to wait for the destination to quiesce
* @throws Exception TODO
*/
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception;
void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception;
/**
* Returns a copy of the current destinations available in the region
*
* @return a copy of the regions currently active at the time of the call with the key the destination and the value the Destination.
*/
public Map getDestinationMap();
Map getDestinationMap();
/**
@ -78,14 +78,14 @@ public interface Region extends Service {
* @return TODO
* @throws Exception TODO
*/
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception;
Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception;
/**
* Removes a consumer.
* @param context the environment the operation is being executed under.
* @throws Exception TODO
*/
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception;
void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception;
/**
* Deletes a durable subscription.
@ -93,7 +93,7 @@ public interface Region extends Service {
* @param info TODO
* @throws Exception TODO
*/
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception;
void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception;
/**
* Send a message to the broker to using the specified destination. The destination specified
@ -103,34 +103,34 @@ public interface Region extends Service {
* @param message
* @throws Exception TODO
*/
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception;
void send(ProducerBrokerExchange producerExchange, Message message) throws Exception;
/**
* Used to acknowledge the receipt of a message by a client.
* @param consumerExchange the environment the operation is being executed under.
* @throws Exception TODO
*/
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception;
void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception;
/**
* Allows a consumer to pull a message from a queue
*/
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception;
Response messagePull(ConnectionContext context, MessagePull pull) throws Exception;
/**
* Process a notification of a dispatch - used by a Slave Broker
* @param messageDispatchNotification
* @throws Exception TODO
*/
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
public void gc();
void gc();
/**
* Provide an exact or wildcard lookup of destinations in the region
*
* @return a set of matching destination objects.
*/
public Set getDestinations(ActiveMQDestination destination);
Set getDestinations(ActiveMQDestination destination);
}

View File

@ -74,7 +74,7 @@ import org.apache.commons.logging.LogFactory;
*/
public class RegionBroker implements Broker {
private static final Log LOG = LogFactory.getLog(RegionBroker.class);
private static final IdGenerator brokerIdGenerator = new IdGenerator();
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
private final Region queueRegion;
private final Region topicRegion;
@ -486,7 +486,7 @@ public class RegionBroker implements Broker {
// TODO: this should persist the broker id so that subsequent
// startup
// uses the same broker id.
brokerId = new BrokerId(brokerIdGenerator.generateId());
brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
}
return brokerId;
}

View File

@ -140,50 +140,50 @@ public interface Subscription extends SubscriptionRecovery {
/**
* @return the JMS selector on the current subscription
*/
public String getSelector();
String getSelector();
/**
* Attempts to change the current active selector on the subscription.
* This operation is not supported for persistent topics.
*/
public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException;
void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException;
/**
* @return the JMX object name that this subscription was registered as if applicable
*/
public ObjectName getObjectName();
ObjectName getObjectName();
/**
* Set when the subscription is registered in JMX
*/
public void setObjectName(ObjectName objectName);
void setObjectName(ObjectName objectName);
/**
* @return true when 60% or more room is left for dispatching messages
*/
public boolean isLowWaterMark();
boolean isLowWaterMark();
/**
* @return true when 10% or less room is left for dispatching messages
*/
public boolean isHighWaterMark();
boolean isHighWaterMark();
/**
* inform the MessageConsumer on the client to change it's prefetch
* @param newPrefetch
*/
public void updateConsumerPrefetch(int newPrefetch);
void updateConsumerPrefetch(int newPrefetch);
/**
* optimize message consumer prefetch if the consumer supports it
*
*/
public void optimizePrefetch();
void optimizePrefetch();
/**
* Called when the subscription is destroyed.
*/
public void destroy();
void destroy();
/**
* @return the prefetch size that is configured for the subscription
@ -196,6 +196,6 @@ public interface Subscription extends SubscriptionRecovery {
* @see org.apache.activemq.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
public boolean isRecoveryRequired();
boolean isRecoveryRequired();
}

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import javax.jms.JMSException;

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
@ -42,25 +44,28 @@ import org.apache.commons.logging.LogFactory;
public class TopicSubscription extends AbstractSubscription {
private static final Log LOG = LogFactory.getLog(TopicSubscription.class);
private static final AtomicLong cursorNameCounter = new AtomicLong(0);
private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
protected PendingMessageCursor matched;
final protected UsageManager usageManager;
protected final UsageManager usageManager;
protected AtomicLong dispatchedCounter = new AtomicLong();
protected AtomicLong prefetchExtension = new AtomicLong();
boolean singleDestination = true;
Destination destination;
private int maximumPendingMessages = -1;
private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
private int discarded;
private final Object matchedListMutex = new Object();
private final AtomicLong enqueueCounter = new AtomicLong(0);
private final AtomicLong dequeueCounter = new AtomicLong(0);
boolean singleDestination = true;
Destination destination;
private int memoryUsageHighWaterMark = 95;
public TopicSubscription(Broker broker, ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws Exception {
super(broker, context, info);
this.usageManager = usageManager;
String matchedName = "TopicSubscription:" + cursorNameCounter.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
this.matched = new FilePendingMessageCursor(matchedName, broker.getTempDataStore());
}
@ -165,7 +170,7 @@ public class TopicSubscription extends AbstractSubscription {
}
}
synchronized public void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case.
boolean wasFull = isFull();
if (ack.isStandardAck() || ack.isPoisonAck()) {

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.util.LinkedList;

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
@ -39,8 +41,9 @@ import org.apache.commons.logging.LogFactory;
*/
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
static private final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class);
static private final AtomicLong nameCount = new AtomicLong();
private static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class);
private static final AtomicLong NAME_COUNT = new AtomicLong();
private Store store;
private String name;
private LinkedList memoryList = new LinkedList();
@ -56,7 +59,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* @param store
*/
public FilePendingMessageCursor(String name, Store store) {
this.name = nameCount.incrementAndGet() + "_" + name;
this.name = NAME_COUNT.incrementAndGet() + "_" + name;
this.store = store;
}

View File

@ -1,15 +1,18 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
@ -37,7 +40,7 @@ public interface PendingMessageCursor extends Service {
* @param destination
* @throws Exception
*/
public void add(ConnectionContext context, Destination destination) throws Exception;
void add(ConnectionContext context, Destination destination) throws Exception;
/**
* remove a destination
@ -46,12 +49,12 @@ public interface PendingMessageCursor extends Service {
* @param destination
* @throws Exception
*/
public void remove(ConnectionContext context, Destination destination) throws Exception;
void remove(ConnectionContext context, Destination destination) throws Exception;
/**
* @return true if there are no pending messages
*/
public boolean isEmpty();
boolean isEmpty();
/**
* check if a Destination is Empty for this cursor
@ -59,18 +62,18 @@ public interface PendingMessageCursor extends Service {
* @param destination
* @return true id the Destination is empty
*/
public boolean isEmpty(Destination destination);
boolean isEmpty(Destination destination);
/**
* reset the cursor
*/
public void reset();
void reset();
/**
* hint to the cursor to release any locks it might have grabbed after a
* reset
*/
public void release();
void release();
/**
* add message to await dispatch
@ -79,7 +82,7 @@ public interface PendingMessageCursor extends Service {
* @throws IOException
* @throws Exception
*/
public void addMessageLast(MessageReference node) throws Exception;
void addMessageLast(MessageReference node) throws Exception;
/**
* add message to await dispatch
@ -87,7 +90,7 @@ public interface PendingMessageCursor extends Service {
* @param node
* @throws Exception
*/
public void addMessageFirst(MessageReference node) throws Exception;
void addMessageFirst(MessageReference node) throws Exception;
/**
* Add a message recovered from a retroactive policy
@ -95,32 +98,32 @@ public interface PendingMessageCursor extends Service {
* @param node
* @throws Exception
*/
public void addRecoveredMessage(MessageReference node) throws Exception;
void addRecoveredMessage(MessageReference node) throws Exception;
/**
* @return true if there pending messages to dispatch
*/
public boolean hasNext();
boolean hasNext();
/**
* @return the next pending message
*/
public MessageReference next();
MessageReference next();
/**
* remove the message at the cursor position
*/
public void remove();
void remove();
/**
* @return the number of pending messages
*/
public int size();
int size();
/**
* clear all pending messages
*/
public void clear();
void clear();
/**
* Informs the Broker if the subscription needs to intervention to recover
@ -128,37 +131,37 @@ public interface PendingMessageCursor extends Service {
*
* @return true if recovery required
*/
public boolean isRecoveryRequired();
boolean isRecoveryRequired();
/**
* @return the maximum batch size
*/
public int getMaxBatchSize();
int getMaxBatchSize();
/**
* Set the max batch size
*
* @param maxBatchSize
*/
public void setMaxBatchSize(int maxBatchSize);
void setMaxBatchSize(int maxBatchSize);
/**
* Give the cursor a hint that we are about to remove messages from memory
* only
*/
public void resetForGC();
void resetForGC();
/**
* remove a node
*
* @param node
*/
public void remove(MessageReference node);
void remove(MessageReference node);
/**
* free up any internal buffers
*/
public void gc();
void gc();
/**
* Set the UsageManager
@ -166,39 +169,39 @@ public interface PendingMessageCursor extends Service {
* @param usageManager
* @see org.apache.activemq.memory.UsageManager
*/
public void setUsageManager(UsageManager usageManager);
void setUsageManager(UsageManager usageManager);
/**
* @return the usageManager
*/
public UsageManager getUsageManager();
UsageManager getUsageManager();
/**
* @return the memoryUsageHighWaterMark
*/
public int getMemoryUsageHighWaterMark();
int getMemoryUsageHighWaterMark();
/**
* @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
*/
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark);
void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark);
/**
* @return true if the cursor is full
*/
public boolean isFull();
boolean isFull();
/**
* @return true if the cursor has buffered messages ready to deliver
*/
public boolean hasMessagesBufferedToDeliver();
boolean hasMessagesBufferedToDeliver();
/**
* destroy the cursor
*
* @throws Exception
*/
public void destroy() throws Exception;
void destroy() throws Exception;
/**
* Page in a restricted number of messages
@ -206,6 +209,6 @@ public interface PendingMessageCursor extends Service {
* @param maxItems
* @return a list of paged in messages
*/
public LinkedList pageInList(int maxItems);
LinkedList pageInList(int maxItems);
}

View File

@ -1,21 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
@ -38,7 +36,7 @@ import org.apache.commons.logging.LogFactory;
*/
class QueueStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener {
static private final Log LOG = LogFactory.getLog(QueueStorePrefetch.class);
private static final Log LOG = LogFactory.getLog(QueueStorePrefetch.class);
private MessageStore store;
private final LinkedList<Message> batchList = new LinkedList<Message>();

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
@ -32,14 +34,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* perist pending messages pending message (messages awaiting disptach to a
* persist pending messages pending message (messages awaiting dispatch to a
* consumer) cursor
*
* @version $Revision$
*/
public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
static private final Log log = LogFactory.getLog(StoreDurableSubscriberCursor.class);
private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class);
private int pendingCount;
private String clientId;
private String subscriberName;
@ -182,7 +184,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
try {
currentCursor = getNextCursor();
} catch (Exception e) {
log.error("Failed to get current cursor ", e);
LOG.error("Failed to get current cursor ", e);
throw new RuntimeException(e);
}
result = currentCursor != null ? currentCursor.hasNext() : false;

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.broker.region.MessageReference;
@ -29,7 +31,7 @@ import org.apache.commons.logging.LogFactory;
*/
public class StoreQueueCursor extends AbstractPendingMessageCursor {
static private final Log LOG = LogFactory.getLog(StoreQueueCursor.class);
private static final Log LOG = LogFactory.getLog(StoreQueueCursor.class);
private int pendingCount = 0;
private Queue queue;
private Store tmpStore;

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
@ -34,7 +36,7 @@ import org.apache.commons.logging.LogFactory;
*/
class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener {
static private final Log log = LogFactory.getLog(TopicStorePrefetch.class);
private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
private TopicMessageStore store;
private final LinkedList<Message> batchList = new LinkedList<Message>();
private String clientId;
@ -64,7 +66,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
try {
fillBatch();
} catch (Exception e) {
log.error("Failed to fill batch", e);
LOG.error("Failed to fill batch", e);
throw new RuntimeException(e);
}
}
@ -133,7 +135,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
try {
fillBatch();
} catch (final Exception e) {
log.error("Failed to fill batch", e);
LOG.error("Failed to fill batch", e);
throw new RuntimeException(e);
}
if (batchList.isEmpty()) {
@ -193,7 +195,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
batchList.removeFirst();
}
if (batchList.isEmpty()) {
log.debug("Refilling batch - haven't got past first message = " + firstMessageId);
LOG.debug("Refilling batch - haven't got past first message = " + firstMessageId);
fillBatch();
}
}
@ -212,7 +214,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
try {
return store.getMessageCount(clientId, subscriberName);
} catch (IOException e) {
log.error(this + " Failed to get the outstanding message count from the store", e);
LOG.error(this + " Failed to get the outstanding message count from the store", e);
throw new RuntimeException(e);
}
}

View File

@ -1,15 +1,18 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;

View File

@ -1,12 +1,12 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@ -1,12 +1,12 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -24,6 +24,6 @@ package org.apache.activemq.broker.region.group;
*/
public interface MessageGroupMapFactory {
public MessageGroupMap createMessageGroupMap();
MessageGroupMap createMessageGroupMap();
}

View File

@ -1,12 +1,12 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@ -32,7 +32,7 @@ public interface DeadLetterStrategy {
* @param message
* @return true if message should be sent to a dead letter queue
*/
public boolean isSendToDeadLetterQueue(Message message);
boolean isSendToDeadLetterQueue(Message message);
/**
* Returns the dead letter queue for the given destination.

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;

View File

@ -1,17 +1,19 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.Queue;

View File

@ -1,15 +1,18 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;

View File

@ -45,14 +45,14 @@ public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecover
return rc;
}
synchronized public boolean add(ConnectionContext context, MessageReference node) throws Exception {
public synchronized boolean add(ConnectionContext context, MessageReference node) throws Exception {
messages[tail++] = node;
if (tail >= messages.length)
tail = 0;
return true;
}
synchronized public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception {
public synchronized void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception {
// Re-dispatch the last message seen.
int t = tail;
// The buffer may not have rolled over yet..., start from the front

Some files were not shown because too many files have changed in this diff Show More