mirror of https://github.com/apache/activemq.git
Fix a bunch of javadoc warnings produced during release profile builds.
This commit is contained in:
parent
7f4bf84342
commit
5e05df1cb7
|
@ -42,8 +42,6 @@ import org.apache.activemq.usage.Usage;
|
||||||
/**
|
/**
|
||||||
* The Message Broker which routes messages, maintains subscriptions and
|
* The Message Broker which routes messages, maintains subscriptions and
|
||||||
* connections, acknowledges messages and handles transactions.
|
* connections, acknowledges messages and handles transactions.
|
||||||
*
|
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public interface Broker extends Region, Service {
|
public interface Broker extends Region, Service {
|
||||||
|
|
||||||
|
@ -51,7 +49,7 @@ public interface Broker extends Region, Service {
|
||||||
* Get a Broker from the Broker Stack that is a particular class
|
* Get a Broker from the Broker Stack that is a particular class
|
||||||
*
|
*
|
||||||
* @param type
|
* @param type
|
||||||
* @return
|
* @return a Broker instance.
|
||||||
*/
|
*/
|
||||||
Broker getAdaptor(Class type);
|
Broker getAdaptor(Class type);
|
||||||
|
|
||||||
|
@ -117,7 +115,7 @@ public interface Broker extends Region, Service {
|
||||||
/**
|
/**
|
||||||
* Adds a producer.
|
* Adds a producer.
|
||||||
*
|
*
|
||||||
* @param context the enviorment the operation is being executed under.
|
* @param context the environment the operation is being executed under.
|
||||||
* @throws Exception TODO
|
* @throws Exception TODO
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@ -126,7 +124,7 @@ public interface Broker extends Region, Service {
|
||||||
/**
|
/**
|
||||||
* Removes a producer.
|
* Removes a producer.
|
||||||
*
|
*
|
||||||
* @param context the enviorment the operation is being executed under.
|
* @param context the environment the operation is being executed under.
|
||||||
* @throws Exception TODO
|
* @throws Exception TODO
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@ -146,8 +144,10 @@ public interface Broker extends Region, Service {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* return a reference destination map of a region based on the destination type
|
* return a reference destination map of a region based on the destination type
|
||||||
|
*
|
||||||
* @param destination
|
* @param destination
|
||||||
* @return
|
*
|
||||||
|
* @return destination Map
|
||||||
*/
|
*/
|
||||||
public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination destination);
|
public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination destination);
|
||||||
|
|
||||||
|
@ -155,7 +155,7 @@ public interface Broker extends Region, Service {
|
||||||
* Gets a list of all the prepared xa transactions.
|
* Gets a list of all the prepared xa transactions.
|
||||||
*
|
*
|
||||||
* @param context transaction ids
|
* @param context transaction ids
|
||||||
* @return
|
* @return array of TransactionId values
|
||||||
* @throws Exception TODO
|
* @throws Exception TODO
|
||||||
*/
|
*/
|
||||||
TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception;
|
TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception;
|
||||||
|
@ -186,7 +186,6 @@ public interface Broker extends Region, Service {
|
||||||
* @param xid
|
* @param xid
|
||||||
* @throws Exception TODO
|
* @throws Exception TODO
|
||||||
*/
|
*/
|
||||||
|
|
||||||
void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception;
|
void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -253,6 +252,7 @@ public interface Broker extends Region, Service {
|
||||||
*
|
*
|
||||||
* @param context
|
* @param context
|
||||||
* @param info
|
* @param info
|
||||||
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
|
void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
|
||||||
|
@ -318,7 +318,7 @@ public interface Broker extends Region, Service {
|
||||||
*
|
*
|
||||||
* @param context
|
* @param context
|
||||||
* @param messageReference
|
* @param messageReference
|
||||||
* @param subscription, may be null
|
* @param subscription (may be null)
|
||||||
*/
|
*/
|
||||||
void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription);
|
void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription);
|
||||||
|
|
||||||
|
|
|
@ -2242,8 +2242,6 @@ public class BrokerService implements Service {
|
||||||
* Factory method to create a new broker
|
* Factory method to create a new broker
|
||||||
*
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
* @throws
|
|
||||||
* @throws
|
|
||||||
*/
|
*/
|
||||||
protected Broker createBroker() throws Exception {
|
protected Broker createBroker() throws Exception {
|
||||||
regionBroker = createRegionBroker();
|
regionBroker = createRegionBroker();
|
||||||
|
@ -3037,7 +3035,6 @@ public class BrokerService implements Service {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets if the broker allowed to restart on shutdown.
|
* Sets if the broker allowed to restart on shutdown.
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
public void setRestartAllowed(boolean restartAllowed) {
|
public void setRestartAllowed(boolean restartAllowed) {
|
||||||
this.restartAllowed = restartAllowed;
|
this.restartAllowed = restartAllowed;
|
||||||
|
|
|
@ -16,14 +16,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker;
|
package org.apache.activemq.broker;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.activemq.Service;
|
import org.apache.activemq.Service;
|
||||||
import org.apache.activemq.broker.region.ConnectionStatistics;
|
import org.apache.activemq.broker.region.ConnectionStatistics;
|
||||||
import org.apache.activemq.command.Command;
|
import org.apache.activemq.command.Command;
|
||||||
import org.apache.activemq.command.ConnectionControl;
|
import org.apache.activemq.command.ConnectionControl;
|
||||||
import org.apache.activemq.command.Response;
|
import org.apache.activemq.command.Response;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
@ -109,7 +109,8 @@ public interface Connection extends Service {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* return true if a network connection
|
* return true if a network connection
|
||||||
* @return
|
*
|
||||||
|
* @return if this is a network connection
|
||||||
*/
|
*/
|
||||||
boolean isNetworkConnection();
|
boolean isNetworkConnection();
|
||||||
|
|
||||||
|
|
|
@ -186,7 +186,7 @@ public class ConnectionContext {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return
|
* @return true if in recovery mode.
|
||||||
*/
|
*/
|
||||||
public boolean isInRecoveryMode() {
|
public boolean isInRecoveryMode() {
|
||||||
return inRecoveryMode;
|
return inRecoveryMode;
|
||||||
|
|
|
@ -16,11 +16,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker;
|
package org.apache.activemq.broker;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.activemq.Service;
|
import org.apache.activemq.Service;
|
||||||
import org.apache.activemq.store.PersistenceAdapter;
|
import org.apache.activemq.store.PersistenceAdapter;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents a lock service to ensure that a broker is the only master
|
* Represents a lock service to ensure that a broker is the only master
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -22,7 +22,6 @@ import java.net.UnknownHostException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
|
||||||
import org.apache.activemq.transport.vm.VMTransport;
|
|
||||||
import org.apache.activemq.util.InetAddressUtil;
|
import org.apache.activemq.util.InetAddressUtil;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -96,7 +95,9 @@ public class PublishedAddressPolicy {
|
||||||
* logic for this method.
|
* logic for this method.
|
||||||
*
|
*
|
||||||
* @param uriHostEntry
|
* @param uriHostEntry
|
||||||
* @return
|
*
|
||||||
|
* @return the value published for the given host.
|
||||||
|
*
|
||||||
* @throws UnknownHostException
|
* @throws UnknownHostException
|
||||||
*/
|
*/
|
||||||
protected String getPublishedHostValue(String uriHostEntry) throws UnknownHostException {
|
protected String getPublishedHostValue(String uriHostEntry) throws UnknownHostException {
|
||||||
|
|
|
@ -138,7 +138,8 @@ public class AnnotatedMBean extends StandardMBean {
|
||||||
* Extracts the Method from the MBeanOperationInfo
|
* Extracts the Method from the MBeanOperationInfo
|
||||||
*
|
*
|
||||||
* @param op
|
* @param op
|
||||||
* @return
|
*
|
||||||
|
* @return a Method
|
||||||
*/
|
*/
|
||||||
private Method getMethod(MBeanOperationInfo op) {
|
private Method getMethod(MBeanOperationInfo op) {
|
||||||
final MBeanParameterInfo[] params = op.getSignature();
|
final MBeanParameterInfo[] params = op.getSignature();
|
||||||
|
@ -156,7 +157,8 @@ public class AnnotatedMBean extends StandardMBean {
|
||||||
* @param mbean
|
* @param mbean
|
||||||
* @param method
|
* @param method
|
||||||
* @param params
|
* @param params
|
||||||
* @return
|
*
|
||||||
|
* @return a Method
|
||||||
*/
|
*/
|
||||||
private static Method getMethod(Class<?> mbean, String method, String... params) {
|
private static Method getMethod(Class<?> mbean, String method, String... params) {
|
||||||
try {
|
try {
|
||||||
|
@ -216,13 +218,11 @@ public class AnnotatedMBean extends StandardMBean {
|
||||||
return super.invoke(s, objects, strings);
|
return super.invoke(s, objects, strings);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Method getMBeanMethod(Class clazz, String methodName, String[] signature) throws ReflectiveOperationException
|
private Method getMBeanMethod(Class clazz, String methodName, String[] signature) throws ReflectiveOperationException {
|
||||||
{
|
Class[] parameterTypes = new Class[signature.length];
|
||||||
Class[] parameterTypes = new Class[signature.length];
|
for (int i = 0; i < signature.length; i++) {
|
||||||
for (int i = 0; i < signature.length; i++)
|
parameterTypes[i] = Class.forName(signature[i]);
|
||||||
{
|
}
|
||||||
parameterTypes[i] = Class.forName(signature[i]);
|
return clazz.getMethod(methodName, parameterTypes);
|
||||||
}
|
|
||||||
return clazz.getMethod(methodName, parameterTypes);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,12 +17,13 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker.jmx;
|
package org.apache.activemq.broker.jmx;
|
||||||
|
|
||||||
import javax.management.openmbean.CompositeData;
|
|
||||||
import javax.management.openmbean.TabularData;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import javax.management.openmbean.CompositeData;
|
||||||
|
import javax.management.openmbean.TabularData;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
@ -56,7 +57,8 @@ public class CompositeDataHelper {
|
||||||
* object
|
* object
|
||||||
*
|
*
|
||||||
* @param cdata
|
* @param cdata
|
||||||
* @return
|
*
|
||||||
|
* @return a Map of user properties
|
||||||
*/
|
*/
|
||||||
public static Map getMessageUserProperties(CompositeData cdata) {
|
public static Map getMessageUserProperties(CompositeData cdata) {
|
||||||
Map map = new HashMap();
|
Map map = new HashMap();
|
||||||
|
|
|
@ -19,12 +19,9 @@ package org.apache.activemq.broker.jmx;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import org.apache.activemq.Service;
|
import org.apache.activemq.Service;
|
||||||
import org.apache.activemq.broker.TransportConnectionState;
|
|
||||||
import org.apache.activemq.state.TransactionState;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
public interface ConnectionViewMBean extends Service {
|
public interface ConnectionViewMBean extends Service {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return true if the Connection is slow
|
* @return true if the Connection is slow
|
||||||
*/
|
*/
|
||||||
|
@ -118,4 +115,5 @@ public interface ConnectionViewMBean extends Service {
|
||||||
*/
|
*/
|
||||||
@MBeanInfo("The age in ms of the oldest active transaction established on this Connection.")
|
@MBeanInfo("The age in ms of the oldest active transaction established on this Connection.")
|
||||||
public Long getOldestActiveTransactionDuration();
|
public Long getOldestActiveTransactionDuration();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,7 @@ public interface ConnectorViewMBean extends Service {
|
||||||
/**
|
/**
|
||||||
* Returns true if link stealing is enabled on this Connector
|
* Returns true if link stealing is enabled on this Connector
|
||||||
*
|
*
|
||||||
* @returns true if link stealing is enabled.
|
* @return true if link stealing is enabled.
|
||||||
*/
|
*/
|
||||||
@MBeanInfo("Link Stealing enabled")
|
@MBeanInfo("Link Stealing enabled")
|
||||||
boolean isAllowLinkStealingEnabled();
|
boolean isAllowLinkStealingEnabled();
|
||||||
|
|
|
@ -36,7 +36,7 @@ public interface DestinationViewMBean {
|
||||||
String getName();
|
String getName();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Resets the managment counters.
|
* Resets the management counters.
|
||||||
*/
|
*/
|
||||||
@MBeanInfo("Resets statistics.")
|
@MBeanInfo("Resets statistics.")
|
||||||
void resetStatistics();
|
void resetStatistics();
|
||||||
|
@ -193,7 +193,7 @@ public interface DestinationViewMBean {
|
||||||
* @param body the text to send
|
* @param body the text to send
|
||||||
* @param user
|
* @param user
|
||||||
* @param password
|
* @param password
|
||||||
* @return
|
* @return a string value
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@MBeanInfo("Sends a TextMessage to a password-protected destination.")
|
@MBeanInfo("Sends a TextMessage to a password-protected destination.")
|
||||||
|
@ -206,11 +206,14 @@ public interface DestinationViewMBean {
|
||||||
* @param body the text to send
|
* @param body the text to send
|
||||||
* @param user
|
* @param user
|
||||||
* @param password
|
* @param password
|
||||||
* @return
|
*
|
||||||
|
* @return a string value
|
||||||
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@MBeanInfo("Sends a TextMessage to a password-protected destination.")
|
@MBeanInfo("Sends a TextMessage to a password-protected destination.")
|
||||||
String sendTextMessage(@MBeanInfo("headers") Map<String,String> headers, @MBeanInfo("body") String body, @MBeanInfo("user") String user, @MBeanInfo("password") String password) throws Exception;
|
String sendTextMessage(@MBeanInfo("headers") Map<String,String> headers, @MBeanInfo("body") String body, @MBeanInfo("user") String user, @MBeanInfo("password") String password) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the percentage of amount of memory used
|
* @return the percentage of amount of memory used
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -16,9 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker.jmx;
|
package org.apache.activemq.broker.jmx;
|
||||||
|
|
||||||
import javax.management.openmbean.TabularData;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import javax.management.openmbean.TabularData;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the status events of the broker to indicate any warnings.
|
* Returns the status events of the broker to indicate any warnings.
|
||||||
*/
|
*/
|
||||||
|
@ -31,13 +32,13 @@ public interface HealthViewMBean {
|
||||||
* of {@link HealthStatus} on your classpath or you use something
|
* of {@link HealthStatus} on your classpath or you use something
|
||||||
* like <a href="http://jolokia.org/">jolokia</a> to access JMX.
|
* like <a href="http://jolokia.org/">jolokia</a> to access JMX.
|
||||||
*
|
*
|
||||||
* If in doubt, please use the {@link #status()} method instead!
|
* If in doubt, please use the {@link #getCurrentStatus()} method instead!
|
||||||
*/
|
*/
|
||||||
@MBeanInfo("List of warnings and errors about the current health of the Broker - empty list is Good!")
|
@MBeanInfo("List of warnings and errors about the current health of the Broker - empty list is Good!")
|
||||||
List<HealthStatus> healthList() throws Exception;
|
List<HealthStatus> healthList() throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return String representation of the current Broker state
|
* @return String representation of the current Broker state
|
||||||
*/
|
*/
|
||||||
@MBeanInfo("String representation of current Broker state")
|
@MBeanInfo("String representation of current Broker state")
|
||||||
String getCurrentStatus();
|
String getCurrentStatus();
|
||||||
|
|
|
@ -38,9 +38,10 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
|
||||||
* Constructor
|
* Constructor
|
||||||
*
|
*
|
||||||
* @param broker
|
* @param broker
|
||||||
|
* @param brokerService
|
||||||
* @param clientId
|
* @param clientId
|
||||||
* @param userName
|
|
||||||
* @param subInfo
|
* @param subInfo
|
||||||
|
* @param subscription
|
||||||
*/
|
*/
|
||||||
public InactiveDurableSubscriptionView(ManagedRegionBroker broker, BrokerService brokerService, String clientId, SubscriptionInfo subInfo, Subscription subscription) {
|
public InactiveDurableSubscriptionView(ManagedRegionBroker broker, BrokerService brokerService, String clientId, SubscriptionInfo subInfo, Subscription subscription) {
|
||||||
super(broker, brokerService, clientId, null, subscription);
|
super(broker, brokerService, clientId, null, subscription);
|
||||||
|
|
|
@ -48,7 +48,7 @@ public interface Log4JConfigViewMBean {
|
||||||
/**
|
/**
|
||||||
* list of all the logger names and their levels
|
* list of all the logger names and their levels
|
||||||
*
|
*
|
||||||
* @returns a List of all known loggers names.
|
* @return a List of all known loggers names.
|
||||||
*
|
*
|
||||||
* @throws Exception if an error occurs while getting the loggers.
|
* @throws Exception if an error occurs while getting the loggers.
|
||||||
*/
|
*/
|
||||||
|
@ -61,7 +61,7 @@ public interface Log4JConfigViewMBean {
|
||||||
* @param loggerName
|
* @param loggerName
|
||||||
* the name of the logger whose level should be queried.
|
* the name of the logger whose level should be queried.
|
||||||
*
|
*
|
||||||
* @returns the current log level of the given logger.
|
* @return the current log level of the given logger.
|
||||||
*
|
*
|
||||||
* @throws Exception if an error occurs while getting the log level.
|
* @throws Exception if an error occurs while getting the log level.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -521,7 +521,7 @@ public class ManagementContext implements Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return
|
* @return an MBeanServer instance
|
||||||
* @throws NullPointerException
|
* @throws NullPointerException
|
||||||
* @throws MalformedObjectNameException
|
* @throws MalformedObjectNameException
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
|
|
@ -67,13 +67,13 @@ public interface ProducerViewMBean {
|
||||||
boolean isDestinationTemporary();
|
boolean isDestinationTemporary();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @returns the windows size configured for the producer
|
* @return the windows size configured for the producer
|
||||||
*/
|
*/
|
||||||
@MBeanInfo("Configured Window Size for the Producer")
|
@MBeanInfo("Configured Window Size for the Producer")
|
||||||
int getProducerWindowSize();
|
int getProducerWindowSize();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @returns if the Producer is configured for Async dispatch
|
* @return if the Producer is configured for Async dispatch
|
||||||
*/
|
*/
|
||||||
@MBeanInfo("Is the producer configured for Async Dispatch")
|
@MBeanInfo("Is the producer configured for Async Dispatch")
|
||||||
boolean isDispatchAsync();
|
boolean isDispatchAsync();
|
||||||
|
|
|
@ -113,9 +113,8 @@ public interface SubscriptionViewMBean {
|
||||||
int getDispatchedQueueSize();
|
int getDispatchedQueueSize();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The same as the number of messages dispatched -
|
* The same as the number of messages dispatched - making it explicit
|
||||||
* making it explicit
|
* @return number of messages waiting for an acknowledge.
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
@MBeanInfo("Number of messages dispatched awaiting acknowledgement.")
|
@MBeanInfo("Number of messages dispatched awaiting acknowledgement.")
|
||||||
int getMessageCountAwaitingAcknowledge();
|
int getMessageCountAwaitingAcknowledge();
|
||||||
|
|
|
@ -548,7 +548,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
/**
|
/**
|
||||||
* Used to determine if the broker can dispatch to the consumer.
|
* Used to determine if the broker can dispatch to the consumer.
|
||||||
*
|
*
|
||||||
* @return
|
* @return true if the subscription is full
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean isFull() {
|
public boolean isFull() {
|
||||||
|
|
|
@ -50,7 +50,6 @@ public class QueueDispatchSelector extends SimpleDispatchSelector {
|
||||||
return s == this.exclusiveConsumer;
|
return s == this.exclusiveConsumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public boolean canSelect(Subscription subscription,
|
public boolean canSelect(Subscription subscription,
|
||||||
MessageReference m) throws Exception {
|
MessageReference m) throws Exception {
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.List;
|
||||||
|
|
||||||
import javax.jms.InvalidSelectorException;
|
import javax.jms.InvalidSelectorException;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
|
@ -66,7 +67,7 @@ public interface Subscription extends SubscriptionRecovery {
|
||||||
* Is the subscription interested in the message?
|
* Is the subscription interested in the message?
|
||||||
* @param node
|
* @param node
|
||||||
* @param context
|
* @param context
|
||||||
* @return
|
* @return true if matching
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException;
|
boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException;
|
||||||
|
@ -74,7 +75,7 @@ public interface Subscription extends SubscriptionRecovery {
|
||||||
/**
|
/**
|
||||||
* Is the subscription interested in messages in the destination?
|
* Is the subscription interested in messages in the destination?
|
||||||
* @param destination
|
* @param destination
|
||||||
* @return
|
* @return true if matching
|
||||||
*/
|
*/
|
||||||
boolean matches(ActiveMQDestination destination);
|
boolean matches(ActiveMQDestination destination);
|
||||||
|
|
||||||
|
|
|
@ -16,15 +16,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker.region.cursors;
|
package org.apache.activemq.broker.region.cursors;
|
||||||
|
|
||||||
import org.apache.activemq.broker.region.MessageReference;
|
|
||||||
import org.apache.activemq.broker.region.QueueMessageReference;
|
|
||||||
import org.apache.activemq.command.MessageId;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.region.MessageReference;
|
||||||
|
import org.apache.activemq.broker.region.QueueMessageReference;
|
||||||
|
import org.apache.activemq.command.MessageId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An abstraction that keeps the correct order of messages that need to be dispatched
|
* An abstraction that keeps the correct order of messages that need to be dispatched
|
||||||
* to consumers, but also hides the fact that there might be redelivered messages that
|
* to consumers, but also hides the fact that there might be redelivered messages that
|
||||||
|
@ -60,7 +60,7 @@ public class QueueDispatchPendingList implements PendingList {
|
||||||
* @param message
|
* @param message
|
||||||
* The MessageReference that is to be added to this list.
|
* The MessageReference that is to be added to this list.
|
||||||
*
|
*
|
||||||
* @return
|
* @return the pending node.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public PendingNode addMessageFirst(MessageReference message) {
|
public PendingNode addMessageFirst(MessageReference message) {
|
||||||
|
@ -74,7 +74,7 @@ public class QueueDispatchPendingList implements PendingList {
|
||||||
* @param message
|
* @param message
|
||||||
* The MessageReference that is to be added to this list.
|
* The MessageReference that is to be added to this list.
|
||||||
*
|
*
|
||||||
* @return
|
* @return the pending node.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public PendingNode addMessageLast(MessageReference message) {
|
public PendingNode addMessageLast(MessageReference message) {
|
||||||
|
|
|
@ -18,15 +18,13 @@ package org.apache.activemq.memory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defines the interface used to cache messages.
|
* Defines the interface used to cache messages.
|
||||||
*
|
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public interface Cache {
|
public interface Cache {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets an object that was previously <code>put</code> into this object.
|
* Gets an object that was previously <code>put</code> into this object.
|
||||||
*
|
*
|
||||||
* @param msgid
|
* @param key
|
||||||
* @return null if the object was not previously put or if the object has
|
* @return null if the object was not previously put or if the object has
|
||||||
* expired out of the cache.
|
* expired out of the cache.
|
||||||
*/
|
*/
|
||||||
|
@ -35,15 +33,15 @@ public interface Cache {
|
||||||
/**
|
/**
|
||||||
* Puts an object into the cache.
|
* Puts an object into the cache.
|
||||||
*
|
*
|
||||||
* @param messageID
|
* @param key
|
||||||
* @param message
|
* @param value
|
||||||
*/
|
*/
|
||||||
Object put(Object key, Object value);
|
Object put(Object key, Object value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes an object from the cache.
|
* Removes an object from the cache.
|
||||||
*
|
*
|
||||||
* @param messageID
|
* @param key
|
||||||
* @return the object associated with the key if it was still in the cache.
|
* @return the object associated with the key if it was still in the cache.
|
||||||
*/
|
*/
|
||||||
Object remove(Object key);
|
Object remove(Object key);
|
||||||
|
@ -57,7 +55,7 @@ public interface Cache {
|
||||||
/**
|
/**
|
||||||
* How big is the cache right now?
|
* How big is the cache right now?
|
||||||
*
|
*
|
||||||
* @return
|
* @return the size.
|
||||||
*/
|
*/
|
||||||
int size();
|
int size();
|
||||||
|
|
||||||
|
|
|
@ -31,8 +31,6 @@ public class CacheEntry {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* @param entry
|
|
||||||
* @return false if you are trying to remove the tail pointer.
|
* @return false if you are trying to remove the tail pointer.
|
||||||
*/
|
*/
|
||||||
public boolean remove() {
|
public boolean remove() {
|
||||||
|
@ -53,5 +51,4 @@ public class CacheEntry {
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,7 +98,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
/**
|
/**
|
||||||
* sets the LDAP server URI
|
* sets the LDAP server URI
|
||||||
*
|
*
|
||||||
* @param _uri
|
* @param uri
|
||||||
* LDAP server URI
|
* LDAP server URI
|
||||||
*/
|
*/
|
||||||
public void setUri(URI uri) throws Exception {
|
public void setUri(URI uri) throws Exception {
|
||||||
|
@ -114,7 +114,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
/**
|
/**
|
||||||
* sets the base LDAP dn used for lookup operations
|
* sets the base LDAP dn used for lookup operations
|
||||||
*
|
*
|
||||||
* @param _base
|
* @param base
|
||||||
* LDAP base dn
|
* LDAP base dn
|
||||||
*/
|
*/
|
||||||
public void setBase(String base) {
|
public void setBase(String base) {
|
||||||
|
@ -124,7 +124,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
/**
|
/**
|
||||||
* sets the LDAP user for access credentials
|
* sets the LDAP user for access credentials
|
||||||
*
|
*
|
||||||
* @param _user
|
* @param user
|
||||||
* LDAP dn of user
|
* LDAP dn of user
|
||||||
*/
|
*/
|
||||||
public void setUser(String user) {
|
public void setUser(String user) {
|
||||||
|
@ -134,9 +134,10 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
/**
|
/**
|
||||||
* sets the LDAP password for access credentials
|
* sets the LDAP password for access credentials
|
||||||
*
|
*
|
||||||
* @param _password
|
* @param password
|
||||||
* user password
|
* user password
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void setPassword(String password) {
|
public void setPassword(String password) {
|
||||||
this.password = password;
|
this.password = password;
|
||||||
}
|
}
|
||||||
|
@ -144,7 +145,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
/**
|
/**
|
||||||
* sets LDAP anonymous authentication access credentials
|
* sets LDAP anonymous authentication access credentials
|
||||||
*
|
*
|
||||||
* @param _anonymousAuthentication
|
* @param anonymousAuthentication
|
||||||
* set to true to use anonymous authentication
|
* set to true to use anonymous authentication
|
||||||
*/
|
*/
|
||||||
public void setAnonymousAuthentication(boolean anonymousAuthentication) {
|
public void setAnonymousAuthentication(boolean anonymousAuthentication) {
|
||||||
|
@ -154,7 +155,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
/**
|
/**
|
||||||
* sets the LDAP search scope
|
* sets the LDAP search scope
|
||||||
*
|
*
|
||||||
* @param _searchScope
|
* @param searchScope
|
||||||
* LDAP JNDI search scope
|
* LDAP JNDI search scope
|
||||||
*/
|
*/
|
||||||
public void setSearchScope(String searchScope) throws Exception {
|
public void setSearchScope(String searchScope) throws Exception {
|
||||||
|
@ -174,7 +175,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
/**
|
/**
|
||||||
* sets the LDAP search filter as defined in RFC 2254
|
* sets the LDAP search filter as defined in RFC 2254
|
||||||
*
|
*
|
||||||
* @param _searchFilter
|
* @param searchFilter
|
||||||
* LDAP search filter
|
* LDAP search filter
|
||||||
* @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
|
* @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
|
||||||
*/
|
*/
|
||||||
|
@ -186,7 +187,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
* enables/disable a persistent search to the LDAP server as defined in
|
* enables/disable a persistent search to the LDAP server as defined in
|
||||||
* draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
|
* draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
|
||||||
*
|
*
|
||||||
* @param _searchEventListener
|
* @param searchEventListener
|
||||||
* enable = true, disable = false (default)
|
* enable = true, disable = false (default)
|
||||||
* @see <a
|
* @see <a
|
||||||
* href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a>
|
* href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a>
|
||||||
|
@ -198,6 +199,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
/**
|
/**
|
||||||
* start the connector
|
* start the connector
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void start() throws Exception {
|
public void start() throws Exception {
|
||||||
LOG.info("connecting...");
|
LOG.info("connecting...");
|
||||||
Hashtable<String, String> env = new Hashtable<String, String>();
|
Hashtable<String, String> env = new Hashtable<String, String>();
|
||||||
|
@ -262,6 +264,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
/**
|
/**
|
||||||
* stop the connector
|
* stop the connector
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
LOG.info("stopping context...");
|
LOG.info("stopping context...");
|
||||||
for (NetworkConnector connector : connectorMap.values()) {
|
for (NetworkConnector connector : connectorMap.values()) {
|
||||||
|
@ -273,6 +276,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
context.close();
|
context.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]";
|
return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]";
|
||||||
}
|
}
|
||||||
|
@ -395,6 +399,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
/**
|
/**
|
||||||
* invoked when an entry has been added during a persistent search
|
* invoked when an entry has been added during a persistent search
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void objectAdded(NamingEvent event) {
|
public void objectAdded(NamingEvent event) {
|
||||||
LOG.debug("entry added");
|
LOG.debug("entry added");
|
||||||
try {
|
try {
|
||||||
|
@ -407,6 +412,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
/**
|
/**
|
||||||
* invoked when an entry has been removed during a persistent search
|
* invoked when an entry has been removed during a persistent search
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void objectRemoved(NamingEvent event) {
|
public void objectRemoved(NamingEvent event) {
|
||||||
LOG.debug("entry removed");
|
LOG.debug("entry removed");
|
||||||
try {
|
try {
|
||||||
|
@ -419,6 +425,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
/**
|
/**
|
||||||
* invoked when an entry has been renamed during a persistent search
|
* invoked when an entry has been renamed during a persistent search
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void objectRenamed(NamingEvent event) {
|
public void objectRenamed(NamingEvent event) {
|
||||||
LOG.debug("entry renamed");
|
LOG.debug("entry renamed");
|
||||||
// XXX: getNameInNamespace method does not seem to work properly,
|
// XXX: getNameInNamespace method does not seem to work properly,
|
||||||
|
@ -433,6 +440,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
/**
|
/**
|
||||||
* invoked when an entry has been changed during a persistent search
|
* invoked when an entry has been changed during a persistent search
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void objectChanged(NamingEvent event) {
|
public void objectChanged(NamingEvent event) {
|
||||||
LOG.debug("entry changed");
|
LOG.debug("entry changed");
|
||||||
try {
|
try {
|
||||||
|
@ -447,6 +455,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
/**
|
/**
|
||||||
* invoked when an exception has occurred during a persistent search
|
* invoked when an exception has occurred during a persistent search
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void namingExceptionThrown(NamingExceptionEvent event) {
|
public void namingExceptionThrown(NamingExceptionEvent event) {
|
||||||
LOG.error("ERR: caught unexpected exception", event.getException());
|
LOG.error("ERR: caught unexpected exception", event.getException());
|
||||||
}
|
}
|
||||||
|
|
|
@ -260,7 +260,7 @@ public abstract class JmsConnector implements Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param inboundMessageConvertor The inboundMessageConvertor to set.
|
* @param jmsMessageConvertor The jmsMessageConvertor to set.
|
||||||
*/
|
*/
|
||||||
public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
|
public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
|
||||||
this.inboundMessageConvertor = jmsMessageConvertor;
|
this.inboundMessageConvertor = jmsMessageConvertor;
|
||||||
|
|
|
@ -51,7 +51,7 @@ public class ReconnectionPolicy {
|
||||||
* Sets the maximum number of a times a Message send should be retried before
|
* Sets the maximum number of a times a Message send should be retried before
|
||||||
* a JMSExeception is thrown indicating that the operation failed.
|
* a JMSExeception is thrown indicating that the operation failed.
|
||||||
*
|
*
|
||||||
* @param maxRetries
|
* @param maxSendRetries
|
||||||
* number of send retries that will be performed.
|
* number of send retries that will be performed.
|
||||||
*/
|
*/
|
||||||
public void setMaxSendRetries(int maxSendRetries) {
|
public void setMaxSendRetries(int maxSendRetries) {
|
||||||
|
|
|
@ -77,7 +77,7 @@ public class SimpleJmsQueueConnector extends JmsConnector {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param localQueueConnectionFactory The localQueueConnectionFactory to
|
* @param localConnectionFactory The localQueueConnectionFactory to
|
||||||
* set.
|
* set.
|
||||||
*/
|
*/
|
||||||
public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) {
|
public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) {
|
||||||
|
@ -99,8 +99,8 @@ public class SimpleJmsQueueConnector extends JmsConnector {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param outboundQueueConnectionFactoryName The
|
* @param foreignQueueConnectionFactoryName The
|
||||||
* outboundQueueConnectionFactoryName to set.
|
* foreignQueueConnectionFactoryName to set.
|
||||||
*/
|
*/
|
||||||
public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) {
|
public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) {
|
||||||
this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName;
|
this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName;
|
||||||
|
@ -142,15 +142,14 @@ public class SimpleJmsQueueConnector extends JmsConnector {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param outboundQueueConnection The outboundQueueConnection to set.
|
* @param foreignQueueConnection The foreignQueueConnection to set.
|
||||||
*/
|
*/
|
||||||
public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
|
public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
|
||||||
this.foreignConnection.set(foreignQueueConnection);
|
this.foreignConnection.set(foreignQueueConnection);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param outboundQueueConnectionFactory The outboundQueueConnectionFactory
|
* @param foreignQueueConnectionFactory The foreignQueueConnectionFactory to set.
|
||||||
* to set.
|
|
||||||
*/
|
*/
|
||||||
public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) {
|
public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) {
|
||||||
this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
|
this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
|
||||||
|
@ -166,7 +165,7 @@ public class SimpleJmsQueueConnector extends JmsConnector {
|
||||||
if (outboundQueueConnectionFactory == null) {
|
if (outboundQueueConnectionFactory == null) {
|
||||||
// look it up from JNDI
|
// look it up from JNDI
|
||||||
if (outboundQueueConnectionFactoryName != null) {
|
if (outboundQueueConnectionFactoryName != null) {
|
||||||
outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate
|
outboundQueueConnectionFactory = jndiOutboundTemplate
|
||||||
.lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
|
.lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
|
||||||
if (outboundUsername != null) {
|
if (outboundUsername != null) {
|
||||||
newConnection = outboundQueueConnectionFactory
|
newConnection = outboundQueueConnectionFactory
|
||||||
|
@ -225,7 +224,7 @@ public class SimpleJmsQueueConnector extends JmsConnector {
|
||||||
if (embeddedConnectionFactory == null) {
|
if (embeddedConnectionFactory == null) {
|
||||||
// look it up from JNDI
|
// look it up from JNDI
|
||||||
if (localConnectionFactoryName != null) {
|
if (localConnectionFactoryName != null) {
|
||||||
localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate
|
localQueueConnectionFactory = jndiLocalTemplate
|
||||||
.lookup(localConnectionFactoryName, QueueConnectionFactory.class);
|
.lookup(localConnectionFactoryName, QueueConnectionFactory.class);
|
||||||
if (localUsername != null) {
|
if (localUsername != null) {
|
||||||
newConnection = localQueueConnectionFactory
|
newConnection = localQueueConnectionFactory
|
||||||
|
@ -350,6 +349,7 @@ public class SimpleJmsQueueConnector extends JmsConnector {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
|
protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
|
||||||
Connection replyToConsumerConnection) {
|
Connection replyToConsumerConnection) {
|
||||||
Queue replyToProducerQueue = (Queue)destination;
|
Queue replyToProducerQueue = (Queue)destination;
|
||||||
|
@ -359,6 +359,7 @@ public class SimpleJmsQueueConnector extends JmsConnector {
|
||||||
InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);
|
InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);
|
||||||
if (bridge == null) {
|
if (bridge == null) {
|
||||||
bridge = new InboundQueueBridge() {
|
bridge = new InboundQueueBridge() {
|
||||||
|
@Override
|
||||||
protected Destination processReplyToDestination(Destination destination) {
|
protected Destination processReplyToDestination(Destination destination) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -390,6 +391,7 @@ public class SimpleJmsQueueConnector extends JmsConnector {
|
||||||
OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue);
|
OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue);
|
||||||
if (bridge == null) {
|
if (bridge == null) {
|
||||||
bridge = new OutboundQueueBridge() {
|
bridge = new OutboundQueueBridge() {
|
||||||
|
@Override
|
||||||
protected Destination processReplyToDestination(Destination destination) {
|
protected Destination processReplyToDestination(Destination destination) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -430,7 +432,7 @@ public class SimpleJmsQueueConnector extends JmsConnector {
|
||||||
if (preferJndiDestinationLookup) {
|
if (preferJndiDestinationLookup) {
|
||||||
try {
|
try {
|
||||||
// look-up the Queue
|
// look-up the Queue
|
||||||
result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
|
result = jndiOutboundTemplate.lookup(queueName, Queue.class);
|
||||||
} catch (NamingException e) {
|
} catch (NamingException e) {
|
||||||
try {
|
try {
|
||||||
result = session.createQueue(queueName);
|
result = session.createQueue(queueName);
|
||||||
|
@ -448,7 +450,7 @@ public class SimpleJmsQueueConnector extends JmsConnector {
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
// look-up the Queue
|
// look-up the Queue
|
||||||
try {
|
try {
|
||||||
result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
|
result = jndiOutboundTemplate.lookup(queueName, Queue.class);
|
||||||
} catch (NamingException e1) {
|
} catch (NamingException e1) {
|
||||||
String errStr = "Failed to look-up Queue for name: " + queueName;
|
String errStr = "Failed to look-up Queue for name: " + queueName;
|
||||||
LOG.error(errStr, e);
|
LOG.error(errStr, e);
|
||||||
|
|
|
@ -80,8 +80,8 @@ public class SimpleJmsTopicConnector extends JmsConnector {
|
||||||
/**
|
/**
|
||||||
* @param localTopicConnectionFactory The localTopicConnectionFactory to set.
|
* @param localTopicConnectionFactory The localTopicConnectionFactory to set.
|
||||||
*/
|
*/
|
||||||
public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) {
|
public void setLocalTopicConnectionFactory(TopicConnectionFactory localTopicConnectionFactory) {
|
||||||
this.localTopicConnectionFactory = localConnectionFactory;
|
this.localTopicConnectionFactory = localTopicConnectionFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -99,7 +99,7 @@ public class SimpleJmsTopicConnector extends JmsConnector {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param outboundTopicConnectionFactoryName The outboundTopicConnectionFactoryName to set.
|
* @param foreignTopicConnectionFactoryName The foreignTopicConnectionFactoryName to set.
|
||||||
*/
|
*/
|
||||||
public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
|
public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
|
||||||
this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
|
this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
|
||||||
|
@ -141,14 +141,14 @@ public class SimpleJmsTopicConnector extends JmsConnector {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param outboundTopicConnection The outboundTopicConnection to set.
|
* @param foreignTopicConnection The foreignTopicConnection to set.
|
||||||
*/
|
*/
|
||||||
public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
|
public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
|
||||||
this.foreignConnection.set(foreignTopicConnection);
|
this.foreignConnection.set(foreignTopicConnection);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param outboundTopicConnectionFactory The outboundTopicConnectionFactory to set.
|
* @param foreignTopicConnectionFactory The foreignTopicConnectionFactory to set.
|
||||||
*/
|
*/
|
||||||
public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
|
public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
|
||||||
this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
|
this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
|
||||||
|
@ -164,7 +164,7 @@ public class SimpleJmsTopicConnector extends JmsConnector {
|
||||||
if (outboundTopicConnectionFactory == null) {
|
if (outboundTopicConnectionFactory == null) {
|
||||||
// look it up from JNDI
|
// look it up from JNDI
|
||||||
if (outboundTopicConnectionFactoryName != null) {
|
if (outboundTopicConnectionFactoryName != null) {
|
||||||
outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate
|
outboundTopicConnectionFactory = jndiOutboundTemplate
|
||||||
.lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
|
.lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
|
||||||
if (outboundUsername != null) {
|
if (outboundUsername != null) {
|
||||||
newConnection = outboundTopicConnectionFactory
|
newConnection = outboundTopicConnectionFactory
|
||||||
|
@ -223,7 +223,7 @@ public class SimpleJmsTopicConnector extends JmsConnector {
|
||||||
if (embeddedConnectionFactory == null) {
|
if (embeddedConnectionFactory == null) {
|
||||||
// look it up from JNDI
|
// look it up from JNDI
|
||||||
if (localConnectionFactoryName != null) {
|
if (localConnectionFactoryName != null) {
|
||||||
localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate
|
localTopicConnectionFactory = jndiLocalTemplate
|
||||||
.lookup(localConnectionFactoryName, TopicConnectionFactory.class);
|
.lookup(localConnectionFactoryName, TopicConnectionFactory.class);
|
||||||
if (localUsername != null) {
|
if (localUsername != null) {
|
||||||
newConnection = localTopicConnectionFactory
|
newConnection = localTopicConnectionFactory
|
||||||
|
@ -348,6 +348,7 @@ public class SimpleJmsTopicConnector extends JmsConnector {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
|
protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
|
||||||
Connection replyToConsumerConnection) {
|
Connection replyToConsumerConnection) {
|
||||||
Topic replyToProducerTopic = (Topic)destination;
|
Topic replyToProducerTopic = (Topic)destination;
|
||||||
|
@ -357,6 +358,7 @@ public class SimpleJmsTopicConnector extends JmsConnector {
|
||||||
InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);
|
InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);
|
||||||
if (bridge == null) {
|
if (bridge == null) {
|
||||||
bridge = new InboundTopicBridge() {
|
bridge = new InboundTopicBridge() {
|
||||||
|
@Override
|
||||||
protected Destination processReplyToDestination(Destination destination) {
|
protected Destination processReplyToDestination(Destination destination) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -388,6 +390,7 @@ public class SimpleJmsTopicConnector extends JmsConnector {
|
||||||
OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic);
|
OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic);
|
||||||
if (bridge == null) {
|
if (bridge == null) {
|
||||||
bridge = new OutboundTopicBridge() {
|
bridge = new OutboundTopicBridge() {
|
||||||
|
@Override
|
||||||
protected Destination processReplyToDestination(Destination destination) {
|
protected Destination processReplyToDestination(Destination destination) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -428,7 +431,7 @@ public class SimpleJmsTopicConnector extends JmsConnector {
|
||||||
if (preferJndiDestinationLookup) {
|
if (preferJndiDestinationLookup) {
|
||||||
try {
|
try {
|
||||||
// look-up the Queue
|
// look-up the Queue
|
||||||
result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class);
|
result = jndiOutboundTemplate.lookup(topicName, Topic.class);
|
||||||
} catch (NamingException e) {
|
} catch (NamingException e) {
|
||||||
try {
|
try {
|
||||||
result = session.createTopic(topicName);
|
result = session.createTopic(topicName);
|
||||||
|
@ -446,7 +449,7 @@ public class SimpleJmsTopicConnector extends JmsConnector {
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
// look-up the Topic
|
// look-up the Topic
|
||||||
try {
|
try {
|
||||||
result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class);
|
result = jndiOutboundTemplate.lookup(topicName, Topic.class);
|
||||||
} catch (NamingException e1) {
|
} catch (NamingException e1) {
|
||||||
String errStr = "Failed to look-up Topic for name: " + topicName;
|
String errStr = "Failed to look-up Topic for name: " + topicName;
|
||||||
LOG.error(errStr, e);
|
LOG.error(errStr, e);
|
||||||
|
@ -456,7 +459,7 @@ public class SimpleJmsTopicConnector extends JmsConnector {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ class TopicBridge extends DestinationBridge {
|
||||||
protected TopicConnection consumerConnection;
|
protected TopicConnection consumerConnection;
|
||||||
protected TopicConnection producerConnection;
|
protected TopicConnection producerConnection;
|
||||||
|
|
||||||
|
@Override
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
super.stop();
|
super.stop();
|
||||||
if (consumerSession != null) {
|
if (consumerSession != null) {
|
||||||
|
@ -53,6 +54,7 @@ class TopicBridge extends DestinationBridge {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected MessageConsumer createConsumer() throws JMSException {
|
protected MessageConsumer createConsumer() throws JMSException {
|
||||||
// set up the consumer
|
// set up the consumer
|
||||||
if (consumerConnection == null) return null;
|
if (consumerConnection == null) return null;
|
||||||
|
@ -78,6 +80,7 @@ class TopicBridge extends DestinationBridge {
|
||||||
return consumer;
|
return consumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected synchronized MessageProducer createProducer() throws JMSException {
|
protected synchronized MessageProducer createProducer() throws JMSException {
|
||||||
if (producerConnection == null) return null;
|
if (producerConnection == null) return null;
|
||||||
producerSession = producerConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
|
producerSession = producerConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
@ -85,6 +88,7 @@ class TopicBridge extends DestinationBridge {
|
||||||
return producer;
|
return producer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected synchronized void sendMessage(Message message) throws JMSException {
|
protected synchronized void sendMessage(Message message) throws JMSException {
|
||||||
if (producer == null && createProducer() == null) {
|
if (producer == null && createProducer() == null) {
|
||||||
throw new JMSException("Producer for remote queue not available.");
|
throw new JMSException("Producer for remote queue not available.");
|
||||||
|
@ -119,14 +123,14 @@ class TopicBridge extends DestinationBridge {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Returns the subscriptionName.
|
* @return Returns the consumerName.
|
||||||
*/
|
*/
|
||||||
public String getConsumerName() {
|
public String getConsumerName() {
|
||||||
return consumerName;
|
return consumerName;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param subscriptionName The subscriptionName to set.
|
* @param consumerName The consumerName to set.
|
||||||
*/
|
*/
|
||||||
public void setConsumerName(String consumerName) {
|
public void setConsumerName(String consumerName) {
|
||||||
this.consumerName = consumerName;
|
this.consumerName = consumerName;
|
||||||
|
@ -188,10 +192,12 @@ class TopicBridge extends DestinationBridge {
|
||||||
this.selector = selector;
|
this.selector = selector;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected Connection getConnnectionForConsumer() {
|
protected Connection getConnnectionForConsumer() {
|
||||||
return getConsumerConnection();
|
return getConsumerConnection();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected Connection getConnectionForProducer() {
|
protected Connection getConnectionForProducer() {
|
||||||
return getProducerConnection();
|
return getProducerConnection();
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
|
||||||
* @version 1.0
|
* @version 1.0
|
||||||
*/
|
*/
|
||||||
public class DiscardingDLQBrokerPlugin implements BrokerPlugin {
|
public class DiscardingDLQBrokerPlugin implements BrokerPlugin {
|
||||||
|
|
||||||
public DiscardingDLQBrokerPlugin() {
|
public DiscardingDLQBrokerPlugin() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,12 +42,16 @@ public class DiscardingDLQBrokerPlugin implements BrokerPlugin {
|
||||||
private int reportInterval = 1000;
|
private int reportInterval = 1000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Installs the plugin into the interceptor chain of the broker, returning the new intercepted broker to use.
|
* Installs the plugin into the intercepter chain of the broker, returning the new
|
||||||
|
* intercepted broker to use.
|
||||||
|
*
|
||||||
* @param broker Broker
|
* @param broker Broker
|
||||||
* @throws Exception
|
*
|
||||||
* @return Broker
|
* @return Broker
|
||||||
* @todo Implement this org.apache.activemq.broker.BrokerPlugin method
|
*
|
||||||
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public Broker installPlugin(Broker broker) throws Exception {
|
public Broker installPlugin(Broker broker) throws Exception {
|
||||||
log.info("Installing Discarding Dead Letter Queue broker plugin[dropAll={}; dropTemporaryTopics={}; dropTemporaryQueues={}; dropOnly={}; reportInterval={}]", new Object[]{
|
log.info("Installing Discarding Dead Letter Queue broker plugin[dropAll={}; dropTemporaryTopics={}; dropTemporaryQueues={}; dropOnly={}; reportInterval={}]", new Object[]{
|
||||||
isDropAll(), isDropTemporaryTopics(), isDropTemporaryQueues(), getDropOnly(), reportInterval
|
isDropAll(), isDropTemporaryTopics(), isDropTemporaryQueues(), getDropOnly(), reportInterval
|
||||||
|
|
|
@ -16,58 +16,60 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.plugin;
|
package org.apache.activemq.plugin;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.activemq.broker.Broker;
|
import org.apache.activemq.broker.Broker;
|
||||||
import org.apache.activemq.broker.BrokerPlugin;
|
import org.apache.activemq.broker.BrokerPlugin;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Plugin which allows to force every incoming message to be PERSISTENT or NON-PERSISTENT.
|
* A Plugin which allows to force every incoming message to be PERSISTENT or
|
||||||
|
* NON-PERSISTENT.
|
||||||
*
|
*
|
||||||
* Useful, if you have set the broker usage policy to process ONLY persistent or ONLY non-persistent
|
* Useful, if you have set the broker usage policy to process ONLY persistent or
|
||||||
* messages.
|
* ONLY non-persistent messages.
|
||||||
* @org.apache.xbean.XBean element="forcePersistencyModeBrokerPlugin"
|
*
|
||||||
|
* @org.apache.xbean.XBean element="forcePersistencyModeBrokerPlugin"
|
||||||
*/
|
*/
|
||||||
public class ForcePersistencyModeBrokerPlugin implements BrokerPlugin {
|
public class ForcePersistencyModeBrokerPlugin implements BrokerPlugin {
|
||||||
private static Logger LOG = LoggerFactory.getLogger(ForcePersistencyModeBrokerPlugin.class);
|
private static Logger LOG = LoggerFactory.getLogger(ForcePersistencyModeBrokerPlugin.class);
|
||||||
private boolean persistenceFlag = false;
|
private boolean persistenceFlag = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
*/
|
*/
|
||||||
public ForcePersistencyModeBrokerPlugin() {
|
public ForcePersistencyModeBrokerPlugin() {}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param broker
|
* @param broker
|
||||||
* @return the Broker
|
*
|
||||||
* @throws Exception
|
* @return the Broker
|
||||||
* @see org.apache.activemq.broker.BrokerPlugin#installPlugin(org.apache.activemq.broker.Broker)
|
*
|
||||||
*/
|
* @throws Exception
|
||||||
|
*
|
||||||
|
* @see org.apache.activemq.broker.BrokerPlugin#installPlugin(org.apache.activemq.broker.Broker)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Broker installPlugin(Broker broker) throws Exception {
|
||||||
|
ForcePersistencyModeBroker pB = new ForcePersistencyModeBroker(broker);
|
||||||
|
pB.setPersistenceFlag(isPersistenceForced());
|
||||||
|
LOG.info("Installing ForcePersistencyModeBroker plugin: persistency enforced={}", pB.isPersistent());
|
||||||
|
return pB;
|
||||||
|
}
|
||||||
|
|
||||||
public Broker installPlugin(Broker broker) throws Exception{
|
/**
|
||||||
ForcePersistencyModeBroker pB = new ForcePersistencyModeBroker(broker);
|
* Sets the persistence mode.
|
||||||
pB.setPersistenceFlag(isPersistenceForced());
|
*
|
||||||
LOG.info("Installing ForcePersistencyModeBroker plugin: persistency enforced={}", pB.isPersistent());
|
* @param persistenceFlag
|
||||||
return pB;
|
*/
|
||||||
}
|
public void setPersistenceFlag(final boolean persistenceFlag) {
|
||||||
|
this.persistenceFlag = persistenceFlag;
|
||||||
/** Sets the persistency mode.
|
}
|
||||||
*
|
|
||||||
* @param persistenceFlag
|
|
||||||
*/
|
|
||||||
public void setPersistenceFlag(final boolean persistenceFlag) {
|
|
||||||
this.persistenceFlag = persistenceFlag;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the mode the (activated) plugin will set the message delivery mode
|
|
||||||
*/
|
|
||||||
public final boolean isPersistenceForced() {
|
|
||||||
return persistenceFlag;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the mode the (activated) plugin will set the message delivery
|
||||||
|
* mode
|
||||||
|
*/
|
||||||
|
public final boolean isPersistenceForced() {
|
||||||
|
return persistenceFlag;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,8 +52,9 @@ import org.slf4j.LoggerFactory;
|
||||||
* <p/>
|
* <p/>
|
||||||
* This is influenced by code snippets developed by Maciej Rakowicz
|
* This is influenced by code snippets developed by Maciej Rakowicz
|
||||||
*
|
*
|
||||||
* @see https://issues.apache.org/activemq/browse/AMQ-3004
|
* Refer to:
|
||||||
* @see http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E
|
* https://issues.apache.org/activemq/browse/AMQ-3004
|
||||||
|
* http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E
|
||||||
*/
|
*/
|
||||||
public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable {
|
public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class);
|
private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class);
|
||||||
|
|
|
@ -16,13 +16,13 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.plugin;
|
package org.apache.activemq.plugin;
|
||||||
|
|
||||||
|
import static org.apache.activemq.plugin.SubQueueSelectorCacheBroker.MAX_PERSIST_INTERVAL;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
|
||||||
import org.apache.activemq.broker.Broker;
|
import org.apache.activemq.broker.Broker;
|
||||||
import org.apache.activemq.broker.BrokerPlugin;
|
import org.apache.activemq.broker.BrokerPlugin;
|
||||||
|
|
||||||
import static org.apache.activemq.plugin.SubQueueSelectorCacheBroker.MAX_PERSIST_INTERVAL;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A plugin which allows the caching of the selector from a subscription queue.
|
* A plugin which allows the caching of the selector from a subscription queue.
|
||||||
* <p/>
|
* <p/>
|
||||||
|
@ -31,8 +31,7 @@ import static org.apache.activemq.plugin.SubQueueSelectorCacheBroker.MAX_PERSIST
|
||||||
* <p/>
|
* <p/>
|
||||||
* This is influenced by code snippets developed by Maciej Rakowicz
|
* This is influenced by code snippets developed by Maciej Rakowicz
|
||||||
*
|
*
|
||||||
* @author Roelof Naude roelof(dot)naude(at)gmail.com
|
* @org.apache.xbean.XBean element="virtualSelectorCacheBrokerPlugin"
|
||||||
*@org.apache.xbean.XBean element="virtualSelectorCacheBrokerPlugin"
|
|
||||||
*/
|
*/
|
||||||
public class SubQueueSelectorCacheBrokerPlugin implements BrokerPlugin {
|
public class SubQueueSelectorCacheBrokerPlugin implements BrokerPlugin {
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,7 @@ public class JaasCertificateAuthenticationBroker extends BrokerFilter implements
|
||||||
* Simple constructor. Leaves everything to superclass.
|
* Simple constructor. Leaves everything to superclass.
|
||||||
*
|
*
|
||||||
* @param next The Broker that does the actual work for this Filter.
|
* @param next The Broker that does the actual work for this Filter.
|
||||||
* @param jassConfiguration The JAAS domain configuration name (refere to
|
* @param jaasConfiguration The JAAS domain configuration name (refere to
|
||||||
* JAAS documentation).
|
* JAAS documentation).
|
||||||
*/
|
*/
|
||||||
public JaasCertificateAuthenticationBroker(Broker next, String jaasConfiguration) {
|
public JaasCertificateAuthenticationBroker(Broker next, String jaasConfiguration) {
|
||||||
|
|
|
@ -17,7 +17,9 @@
|
||||||
package org.apache.activemq.store;
|
package org.apache.activemq.store;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.command.MessageAck;
|
import org.apache.activemq.command.MessageAck;
|
||||||
import org.apache.activemq.command.MessageId;
|
import org.apache.activemq.command.MessageId;
|
||||||
|
@ -25,10 +27,9 @@ import org.apache.activemq.command.SubscriptionInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A MessageStore for durable topic subscriptions
|
* A MessageStore for durable topic subscriptions
|
||||||
*
|
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public interface TopicMessageStore extends MessageStore {
|
public interface TopicMessageStore extends MessageStore {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stores the last acknowledged messgeID for the given subscription so that
|
* Stores the last acknowledged messgeID for the given subscription so that
|
||||||
* we can recover and commence dispatching messages from the last checkpoint
|
* we can recover and commence dispatching messages from the last checkpoint
|
||||||
|
@ -37,7 +38,8 @@ public interface TopicMessageStore extends MessageStore {
|
||||||
* @param clientId
|
* @param clientId
|
||||||
* @param subscriptionName
|
* @param subscriptionName
|
||||||
* @param messageId
|
* @param messageId
|
||||||
* @param subscriptionPersistentId
|
* @param ack
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException;
|
void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException;
|
||||||
|
@ -45,7 +47,7 @@ public interface TopicMessageStore extends MessageStore {
|
||||||
/**
|
/**
|
||||||
* @param clientId
|
* @param clientId
|
||||||
* @param subscriptionName
|
* @param subscriptionName
|
||||||
* @param sub
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
|
@ -61,7 +63,7 @@ public interface TopicMessageStore extends MessageStore {
|
||||||
* @param clientId
|
* @param clientId
|
||||||
* @param subscriptionName
|
* @param subscriptionName
|
||||||
* @param listener
|
* @param listener
|
||||||
* @param subscription
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception;
|
void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception;
|
||||||
|
@ -74,6 +76,7 @@ public interface TopicMessageStore extends MessageStore {
|
||||||
* @param subscriptionName
|
* @param subscriptionName
|
||||||
* @param maxReturned
|
* @param maxReturned
|
||||||
* @param listener
|
* @param listener
|
||||||
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception;
|
void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception;
|
||||||
|
@ -92,7 +95,9 @@ public interface TopicMessageStore extends MessageStore {
|
||||||
*
|
*
|
||||||
* @param clientId
|
* @param clientId
|
||||||
* @param subscriberName
|
* @param subscriberName
|
||||||
|
*
|
||||||
* @return the outstanding message count
|
* @return the outstanding message count
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
int getMessageCount(String clientId, String subscriberName) throws IOException;
|
int getMessageCount(String clientId, String subscriberName) throws IOException;
|
||||||
|
@ -102,7 +107,9 @@ public interface TopicMessageStore extends MessageStore {
|
||||||
*
|
*
|
||||||
* @param clientId
|
* @param clientId
|
||||||
* @param subscriptionName
|
* @param subscriptionName
|
||||||
|
*
|
||||||
* @return the SubscriptionInfo
|
* @return the SubscriptionInfo
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException;
|
SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException;
|
||||||
|
@ -123,10 +130,9 @@ public interface TopicMessageStore extends MessageStore {
|
||||||
* subscription without it having an acknowledged message so that on
|
* subscription without it having an acknowledged message so that on
|
||||||
* recovery, all message recorded for the topic get replayed.
|
* recovery, all message recorded for the topic get replayed.
|
||||||
*
|
*
|
||||||
* @param clientId
|
* @param subscriptionInfo
|
||||||
* @param subscriptionName
|
|
||||||
* @param selector
|
|
||||||
* @param retroactive
|
* @param retroactive
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException;
|
void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException;
|
||||||
|
|
|
@ -26,10 +26,9 @@ import org.apache.activemq.command.SubscriptionInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A MessageStore for durable topic subscriptions
|
* A MessageStore for durable topic subscriptions
|
||||||
*
|
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
|
public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes the last acknowledged messgeID for the given subscription so that
|
* Removes the last acknowledged messgeID for the given subscription so that
|
||||||
* we can recover and commence dispatching messages from the last checkpoint
|
* we can recover and commence dispatching messages from the last checkpoint
|
||||||
|
@ -40,8 +39,9 @@ public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
|
||||||
* @param clientId
|
* @param clientId
|
||||||
* @param subscriptionName
|
* @param subscriptionName
|
||||||
* @param messageId
|
* @param messageId
|
||||||
* @param subscriptionPersistentId
|
*
|
||||||
* @return true if there are no more references to the message - or the message is null
|
* @return true if there are no more references to the message - or the message is null
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
boolean acknowledgeReference(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
|
boolean acknowledgeReference(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
|
||||||
|
@ -49,10 +49,11 @@ public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
|
||||||
/**
|
/**
|
||||||
* @param clientId
|
* @param clientId
|
||||||
* @param subscriptionName
|
* @param subscriptionName
|
||||||
* @param sub
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
void deleteSubscription(String clientId, String subscriptionName) throws IOException;
|
void deleteSubscription(String clientId, String subscriptionName) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -65,9 +66,10 @@ public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
|
||||||
* @param clientId
|
* @param clientId
|
||||||
* @param subscriptionName
|
* @param subscriptionName
|
||||||
* @param listener
|
* @param listener
|
||||||
* @param subscription
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception;
|
void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -78,8 +80,10 @@ public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
|
||||||
* @param subscriptionName
|
* @param subscriptionName
|
||||||
* @param maxReturned
|
* @param maxReturned
|
||||||
* @param listener
|
* @param listener
|
||||||
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception;
|
void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -88,6 +92,7 @@ public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
|
||||||
* @param clientId
|
* @param clientId
|
||||||
* @param subscriptionName
|
* @param subscriptionName
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
void resetBatching(String clientId, String subscriptionName);
|
void resetBatching(String clientId, String subscriptionName);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -96,9 +101,12 @@ public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
|
||||||
*
|
*
|
||||||
* @param clientId
|
* @param clientId
|
||||||
* @param subscriberName
|
* @param subscriberName
|
||||||
|
*
|
||||||
* @return the outstanding message count
|
* @return the outstanding message count
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
int getMessageCount(String clientId, String subscriberName) throws IOException;
|
int getMessageCount(String clientId, String subscriberName) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -106,17 +114,22 @@ public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
|
||||||
*
|
*
|
||||||
* @param clientId
|
* @param clientId
|
||||||
* @param subscriptionName
|
* @param subscriptionName
|
||||||
|
*
|
||||||
* @return the SubscriptionInfo
|
* @return the SubscriptionInfo
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException;
|
SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Lists all the durable subscirptions for a given destination.
|
* Lists all the durable subscriptions for a given destination.
|
||||||
*
|
*
|
||||||
* @return an array SubscriptionInfos
|
* @return an array SubscriptionInfos
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
SubscriptionInfo[] getAllSubscriptions() throws IOException;
|
SubscriptionInfo[] getAllSubscriptions() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -127,10 +140,9 @@ public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
|
||||||
* subscription without it having an acknowledged message so that on
|
* subscription without it having an acknowledged message so that on
|
||||||
* recovery, all message recorded for the topic get replayed.
|
* recovery, all message recorded for the topic get replayed.
|
||||||
*
|
*
|
||||||
* @param clientId
|
* @param subscriptionInfo
|
||||||
* @param subscriptionName
|
|
||||||
* @param selector
|
|
||||||
* @param retroactive
|
* @param retroactive
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException;
|
void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException;
|
||||||
|
|
|
@ -16,14 +16,16 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport;
|
package org.apache.activemq.transport;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import org.apache.activemq.broker.jmx.AnnotatedMBean;
|
import org.apache.activemq.broker.jmx.AnnotatedMBean;
|
||||||
import org.apache.activemq.broker.jmx.ManagementContext;
|
import org.apache.activemq.broker.jmx.ManagementContext;
|
||||||
import org.apache.activemq.util.IOExceptionSupport;
|
import org.apache.activemq.util.IOExceptionSupport;
|
||||||
import org.apache.activemq.util.LogWriterFinder;
|
import org.apache.activemq.util.LogWriterFinder;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import java.io.IOException;
|
|
||||||
import javax.management.ObjectName;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Singleton class to create TransportLogger objects.
|
* Singleton class to create TransportLogger objects.
|
||||||
|
@ -150,7 +152,7 @@ public class TransportLoggerFactory {
|
||||||
* @param dynamicManagement Specifies if JMX will be used to switch on/off the TransportLogger to be created.
|
* @param dynamicManagement Specifies if JMX will be used to switch on/off the TransportLogger to be created.
|
||||||
* @param startLogging Specifies if this TransportLogger should be initially active or not. Only has a meaning if
|
* @param startLogging Specifies if this TransportLogger should be initially active or not. Only has a meaning if
|
||||||
* dynamicManagement = true.
|
* dynamicManagement = true.
|
||||||
* @param jmxPort the port to be used by the JMX server. It should only be different from 1099 (broker's default JMX port)
|
* @param jmxport the port to be used by the JMX server. It should only be different from 1099 (broker's default JMX port)
|
||||||
* when it's a client that is using Transport Logging. In a broker, if the port is different from 1099, 2 JMX servers will
|
* when it's a client that is using Transport Logging. In a broker, if the port is different from 1099, 2 JMX servers will
|
||||||
* be created, both identical, with all the MBeans.
|
* be created, both identical, with all the MBeans.
|
||||||
* @return A TransportLogger object.
|
* @return A TransportLogger object.
|
||||||
|
|
|
@ -42,7 +42,7 @@ public final class HexSupport {
|
||||||
"f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "fa", "fb", "fc", "fd", "fe", "ff",
|
"f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "fa", "fb", "fc", "fd", "fe", "ff",
|
||||||
};
|
};
|
||||||
private static final int[] INT_OFFSETS = new int[]{
|
private static final int[] INT_OFFSETS = new int[]{
|
||||||
24,16,8,0
|
24,16,8,0
|
||||||
};
|
};
|
||||||
|
|
||||||
private HexSupport() {
|
private HexSupport() {
|
||||||
|
@ -50,7 +50,7 @@ public final class HexSupport {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param hex
|
* @param hex
|
||||||
* @return
|
* @return array of bytes
|
||||||
*/
|
*/
|
||||||
public static byte[] toBytesFromHex(String hex) {
|
public static byte[] toBytesFromHex(String hex) {
|
||||||
byte rc[] = new byte[hex.length() / 2];
|
byte rc[] = new byte[hex.length() / 2];
|
||||||
|
@ -64,7 +64,7 @@ public final class HexSupport {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param bytes
|
* @param bytes
|
||||||
* @return
|
* @return string hex value
|
||||||
*/
|
*/
|
||||||
public static String toHexFromBytes(byte[] bytes) {
|
public static String toHexFromBytes(byte[] bytes) {
|
||||||
StringBuffer rc = new StringBuffer(bytes.length * 2);
|
StringBuffer rc = new StringBuffer(bytes.length * 2);
|
||||||
|
@ -78,18 +78,18 @@ public final class HexSupport {
|
||||||
*
|
*
|
||||||
* @param value
|
* @param value
|
||||||
* @param trim if the leading 0's should be trimmed off.
|
* @param trim if the leading 0's should be trimmed off.
|
||||||
* @return
|
* @return string hex value
|
||||||
*/
|
*/
|
||||||
public static String toHexFromInt(int value, boolean trim) {
|
public static String toHexFromInt(int value, boolean trim) {
|
||||||
StringBuffer rc = new StringBuffer(INT_OFFSETS.length*2);
|
StringBuffer rc = new StringBuffer(INT_OFFSETS.length*2);
|
||||||
for (int i = 0; i < INT_OFFSETS.length; i++) {
|
for (int i = 0; i < INT_OFFSETS.length; i++) {
|
||||||
int b = 0xFF & (value>>INT_OFFSETS[i]);
|
int b = 0xFF & (value>>INT_OFFSETS[i]);
|
||||||
if( !(trim && b == 0) ) {
|
if( !(trim && b == 0) ) {
|
||||||
rc.append(HEX_TABLE[b]);
|
rc.append(HEX_TABLE[b]);
|
||||||
trim=false;
|
trim=false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return rc.toString();
|
return rc.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,7 +66,7 @@ public final class IOHelper {
|
||||||
* and "." characters.
|
* and "." characters.
|
||||||
*
|
*
|
||||||
* @param name
|
* @param name
|
||||||
* @return
|
* @return safe name of the directory
|
||||||
*/
|
*/
|
||||||
public static String toFileSystemDirectorySafeName(String name) {
|
public static String toFileSystemDirectorySafeName(String name) {
|
||||||
return toFileSystemSafeName(name, true, MAX_DIR_NAME_LENGTH);
|
return toFileSystemSafeName(name, true, MAX_DIR_NAME_LENGTH);
|
||||||
|
@ -84,7 +84,7 @@ public final class IOHelper {
|
||||||
* @param name
|
* @param name
|
||||||
* @param dirSeparators
|
* @param dirSeparators
|
||||||
* @param maxFileLength
|
* @param maxFileLength
|
||||||
* @return
|
* @return file system safe name
|
||||||
*/
|
*/
|
||||||
public static String toFileSystemSafeName(String name, boolean dirSeparators, int maxFileLength) {
|
public static String toFileSystemSafeName(String name, boolean dirSeparators, int maxFileLength) {
|
||||||
int size = name.length();
|
int size = name.length();
|
||||||
|
|
|
@ -103,7 +103,6 @@ public class LinkedNode {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param leftHead the node to link after this node.
|
* @param leftHead the node to link after this node.
|
||||||
* @return
|
|
||||||
* @return this
|
* @return this
|
||||||
*/
|
*/
|
||||||
public LinkedNode linkBefore(LinkedNode leftHead) {
|
public LinkedNode linkBefore(LinkedNode leftHead) {
|
||||||
|
@ -154,5 +153,4 @@ public class LinkedNode {
|
||||||
prev = this;
|
prev = this;
|
||||||
tail = true;
|
tail = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -245,7 +245,7 @@ public class Activator implements BundleActivator, SynchronousBundleListener, Ob
|
||||||
* one of the packages of our interfaces
|
* one of the packages of our interfaces
|
||||||
*
|
*
|
||||||
* @param bundle
|
* @param bundle
|
||||||
* @return
|
* @return true if the bundle is improting.
|
||||||
*/
|
*/
|
||||||
private boolean isImportingUs(Bundle bundle) {
|
private boolean isImportingUs(Bundle bundle) {
|
||||||
BundleWiring wiring = bundle.adapt(BundleWiring.class);
|
BundleWiring wiring = bundle.adapt(BundleWiring.class);
|
||||||
|
|
Loading…
Reference in New Issue