More checkstyle fixes and Generics usage.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@564814 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-08-11 00:49:19 +00:00
parent 5719351cb7
commit 933eb2f9e4
311 changed files with 2041 additions and 1743 deletions

View File

@ -106,7 +106,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private static final Log LOG = LogFactory.getLog(ActiveMQConnection.class);
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
public final ConcurrentHashMap activeTempDestinations = new ConcurrentHashMap();
public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
protected boolean dispatchAsync;
protected boolean alwaysSessionAsync = true;
@ -151,14 +151,14 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private final AtomicBoolean closing = new AtomicBoolean(false);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean transportFailed = new AtomicBoolean(false);
private final CopyOnWriteArrayList sessions = new CopyOnWriteArrayList();
private final CopyOnWriteArrayList connectionConsumers = new CopyOnWriteArrayList();
private final CopyOnWriteArrayList inputStreams = new CopyOnWriteArrayList();
private final CopyOnWriteArrayList outputStreams = new CopyOnWriteArrayList();
private final CopyOnWriteArrayList transportListeners = new CopyOnWriteArrayList();
private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
// Maps ConsumerIds to ActiveMQConsumer objects
private final ConcurrentHashMap dispatchers = new ConcurrentHashMap();
private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
private final SessionId connectionSessionId;
@ -194,7 +194,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// Configure a single threaded executor who's core thread can timeout if
// idle
asyncConnectionThread = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
asyncConnectionThread = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "AcitveMQ Connection Worker: " + transport);
thread.setDaemon(true);
@ -290,7 +290,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
boolean doSessionAsync = alwaysSessionAsync || sessions.size() > 0 || transacted || acknowledgeMode == Session.CLIENT_ACKNOWLEDGE;
return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), dispatchAsync, alwaysSessionAsync);
}
@ -453,8 +452,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
checkClosedOrFailed();
ensureConnectionInfoSent();
if (started.compareAndSet(false, true)) {
for (Iterator i = sessions.iterator(); i.hasNext();) {
ActiveMQSession session = (ActiveMQSession)i.next();
for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
ActiveMQSession session = i.next();
session.start();
}
}
@ -494,8 +493,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void stop() throws JMSException {
checkClosedOrFailed();
if (started.compareAndSet(true, false)) {
for (Iterator i = sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession)i.next();
for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.stop();
}
}
@ -560,20 +559,20 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
advisoryConsumer = null;
}
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession)i.next();
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.dispose();
}
for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) {
ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer)i.next();
for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
ActiveMQConnectionConsumer c = i.next();
c.dispose();
}
for (Iterator i = this.inputStreams.iterator(); i.hasNext();) {
ActiveMQInputStream c = (ActiveMQInputStream)i.next();
for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
ActiveMQInputStream c = i.next();
c.dispose();
}
for (Iterator i = this.outputStreams.iterator(); i.hasNext();) {
ActiveMQOutputStream c = (ActiveMQOutputStream)i.next();
for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
ActiveMQOutputStream c = i.next();
c.dispose();
}
@ -691,7 +690,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// Allows the options on the destination to configure the consumerInfo
if (info.getDestination().getOptions() != null) {
HashMap options = new HashMap(info.getDestination().getOptions());
Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
IntrospectionSupport.setProperties(this.info, options, "consumer.");
}
@ -1076,7 +1075,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// Allows the options on the destination to configure the consumerInfo
if (info.getDestination().getOptions() != null) {
HashMap options = new HashMap(info.getDestination().getOptions());
Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
IntrospectionSupport.setProperties(info, options, "consumer.");
}
@ -1324,20 +1323,20 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
advisoryConsumer = null;
}
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession)i.next();
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.dispose();
}
for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) {
ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer)i.next();
for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
ActiveMQConnectionConsumer c = i.next();
c.dispose();
}
for (Iterator i = this.inputStreams.iterator(); i.hasNext();) {
ActiveMQInputStream c = (ActiveMQInputStream)i.next();
for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
ActiveMQInputStream c = i.next();
c.dispose();
}
for (Iterator i = this.outputStreams.iterator(); i.hasNext();) {
ActiveMQOutputStream c = (ActiveMQOutputStream)i.next();
for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
ActiveMQOutputStream c = i.next();
c.dispose();
}
@ -1537,7 +1536,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
command.visit(new CommandVisitorAdapter() {
@Override
public Response processMessageDispatch(MessageDispatch md) throws Exception {
ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)dispatchers.get(md.getConsumerId());
ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
if (dispatcher != null) {
// Copy in case a embedded broker is dispatching via
// vm://
@ -1616,8 +1615,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
}
for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = (TransportListener)iter.next();
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.onCommand(command);
}
}
@ -1660,8 +1659,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
ServiceSupport.dispose(ActiveMQConnection.this.transport);
brokerInfoReceived.countDown();
for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = (TransportListener)iter.next();
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.onException(error);
}
}
@ -1669,23 +1668,23 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
public void transportInterupted() {
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession)i.next();
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.clearMessagesInProgress();
}
for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = (TransportListener)iter.next();
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.transportInterupted();
}
}
public void transportResumed() {
for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = (TransportListener)iter.next();
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.transportResumed();
}
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession)i.next();
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.deliverAcks();
}
}
@ -1726,8 +1725,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
checkClosedOrFailed();
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession)i.next();
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
if (s.isInUse(destination)) {
throw new JMSException("A consumer is consuming from the temporary destination");
}
@ -1881,7 +1880,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* {@link javax.jms.Message#setObjectProperty(String, Object)}
* method
*/
public OutputStream createOutputStream(Destination dest, Map streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
@ -1910,7 +1909,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
checkClosedOrFailed();
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(getConnectionInfo().getConnectionId());
rsi.setSubcriptionName(name);
rsi.setSubscriptionName(name);
rsi.setClientId(getConnectionInfo().getClientId());
syncSendPacket(rsi);
}
@ -1990,8 +1989,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected void onConnectionControl(ConnectionControl command) {
if (command.isFaultTolerant()) {
this.optimizeAcknowledge = false;
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession)i.next();
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.setOptimizeAcknowledge(false);
}
}
@ -1999,13 +1998,13 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected void onConsumerControl(ConsumerControl command) {
if (command.isClose()) {
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession)i.next();
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.close(command.getConsumerId());
}
} else {
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession)i.next();
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
@ -550,10 +551,11 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
setBrokerURL(temp);
}
buildFromMap(properties);
Map<String, Object> p = new HashMap(properties);
buildFromMap(p);
}
public boolean buildFromMap(Map properties) {
public boolean buildFromMap(Map<String, Object> properties) {
boolean rc = false;
ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();

View File

@ -17,7 +17,7 @@
package org.apache.activemq;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Vector;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -137,12 +137,12 @@ public final class ActiveMQConnectionMetaData implements ConnectionMetaData {
* @return an Enumeration of JMSX property names
*/
public Enumeration getJMSXPropertyNames() {
Hashtable jmxProperties = new Hashtable();
jmxProperties.put("JMSXGroupID", "1");
jmxProperties.put("JMSXGroupSeq", "1");
jmxProperties.put("JMSXDeliveryCount", "1");
jmxProperties.put("JMSXProducerTXID", "1");
return jmxProperties.keys();
public Enumeration<String> getJMSXPropertyNames() {
Vector<String> jmxProperties = new Vector<String>();
jmxProperties.add("JMSXGroupID");
jmxProperties.add("JMSXGroupSeq");
jmxProperties.add("JMSXDeliveryCount");
jmxProperties.add("JMSXProducerTXID");
return jmxProperties.elements();
}
}

View File

@ -138,8 +138,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* @param dispatchAsync
* @throws JMSException
*/
public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch,
int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync) throws JMSException {
public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
String name, String selector, int prefetch,
int maximumPendingMessageCount, boolean noLocal, boolean browser,
boolean dispatchAsync) throws JMSException {
if (dest == null) {
throw new InvalidDestinationException("Don't understand null destinations");
} else if (dest.getPhysicalName() == null) {
@ -154,11 +156,13 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
if (physicalName.indexOf(connectionID) < 0) {
throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
throw new InvalidDestinationException(
"Cannot use a Temporary destination from another Connection");
}
if (session.connection.isDeleted(dest)) {
throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
throw new InvalidDestinationException(
"Cannot use a Temporary destination that has been deleted");
}
}
@ -199,7 +203,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() && !info.isBrowser();
this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
&& !info.isBrowser();
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
try {
this.session.addConsumer(this);
@ -342,7 +347,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void setMessageListener(MessageListener listener) throws JMSException {
checkClosed();
if (info.getPrefetchSize() == 0) {
throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
throw new JMSException(
"Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
}
this.messageListener = listener;
if (listener != null) {
@ -709,15 +715,16 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} else {
stats.onMessage();
if( session.isTransacted() ) {
if (session.isTransacted()) {
// Do nothing.
} else if(session.isAutoAcknowledge()) {
} else if (session.isAutoAcknowledge()) {
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
ackCounter++;
if (ackCounter >= (info.getCurrentPrefetchSize() * .65)) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE,
deliveredMessages.size());
session.asyncSendPacket(ack);
ackCounter = 0;
deliveredMessages.clear();
@ -725,7 +732,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
deliveryingAcknowledgements.set(false);
}
} else {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages
.size());
session.asyncSendPacket(ack);
deliveredMessages.clear();
}
@ -842,7 +850,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
}
rollbackCounter++;
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES && rollbackCounter > redeliveryPolicy.getMaximumRedeliveries()) {
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& rollbackCounter > redeliveryPolicy.getMaximumRedeliveries()) {
// We need to NACK the messages so that they get sent to the
// DLQ.
// Acknowledge the last message.
@ -971,7 +980,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
public String toString() {
return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get() + " }";
return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
+ " }";
}
/**

View File

@ -50,7 +50,10 @@ import org.apache.activemq.command.ActiveMQTopic;
*
* @version $Revision: 1.1 $
*/
public class ActiveMQMessageTransformation {
public final class ActiveMQMessageTransformation {
private ActiveMQMessageTransformation() {
}
/**
* Creates a an available JMS message from another provider.
@ -95,7 +98,7 @@ public class ActiveMQMessageTransformation {
* message.
* @throws JMSException if an error occurs
*/
public static final ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection)
public static ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection)
throws JMSException {
if (message instanceof ActiveMQMessage) {
return (ActiveMQMessage)message;

View File

@ -20,6 +20,7 @@ import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataStructure;
@ -79,17 +80,24 @@ public class AdvisoryConsumer implements ActiveMQDispatcher {
if (o != null && o.getClass() == DestinationInfo.class) {
processDestinationInfo((DestinationInfo)o);
} else {
connection.onAsyncException(new JMSException("Unexpected message was dispatched to the AdvisoryConsumer: " + md));
connection.onAsyncException(new JMSException(
"Unexpected message was dispatched to the AdvisoryConsumer: "
+ md));
}
}
private void processDestinationInfo(DestinationInfo dinfo) {
ActiveMQDestination dest = dinfo.getDestination();
if (!dest.isTemporary()) {
return;
}
ActiveMQTempDestination tempDest = (ActiveMQTempDestination)dest;
if (dinfo.getOperationType() == DestinationInfo.ADD_OPERATION_TYPE) {
connection.activeTempDestinations.put(dest, dest);
connection.activeTempDestinations.put(tempDest, tempDest);
} else if (dinfo.getOperationType() == DestinationInfo.REMOVE_OPERATION_TYPE) {
connection.activeTempDestinations.remove(dest);
connection.activeTempDestinations.remove(tempDest);
}
}

View File

@ -50,7 +50,7 @@ public interface StreamConnection extends Connection {
OutputStream createOutputStream(Destination dest) throws JMSException;
OutputStream createOutputStream(Destination dest, Map streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException;
OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException;
/**
* Unsubscribes a durable subscription that has been created by a client.

View File

@ -30,6 +30,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
@ -53,10 +54,10 @@ public class AdvisoryBroker extends BrokerFilter {
private static final Log LOG = LogFactory.getLog(AdvisoryBroker.class);
private static final IdGenerator ID_GENERATOR = new IdGenerator();
protected final ConcurrentHashMap connections = new ConcurrentHashMap();
protected final ConcurrentHashMap consumers = new ConcurrentHashMap();
protected final ConcurrentHashMap producers = new ConcurrentHashMap();
protected final ConcurrentHashMap destinations = new ConcurrentHashMap();
protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
protected final ConcurrentHashMap<ConsumerId, ConsumerInfo> consumers = new ConcurrentHashMap<ConsumerId, ConsumerInfo>();
protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
protected final ProducerId advisoryProducerId = new ProducerId();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
@ -88,8 +89,8 @@ public class AdvisoryBroker extends BrokerFilter {
// for this newly added consumer.
if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) {
// Replay the connections.
for (Iterator iter = connections.values().iterator(); iter.hasNext();) {
ConnectionInfo value = (ConnectionInfo)iter.next();
for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext();) {
ConnectionInfo value = iter.next();
ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
fireAdvisory(context, topic, value, info.getConsumerId());
}
@ -100,8 +101,8 @@ public class AdvisoryBroker extends BrokerFilter {
// for this newly added consumer.
if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) {
// Replay the destinations.
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
DestinationInfo value = (DestinationInfo)iter.next();
for (Iterator<DestinationInfo> iter = destinations.values().iterator(); iter.hasNext();) {
DestinationInfo value = iter.next();
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(value.getDestination());
fireAdvisory(context, topic, value, info.getConsumerId());
}
@ -109,8 +110,8 @@ public class AdvisoryBroker extends BrokerFilter {
// Replay the producers.
if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) {
for (Iterator iter = producers.values().iterator(); iter.hasNext();) {
ProducerInfo value = (ProducerInfo)iter.next();
for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) {
ProducerInfo value = iter.next();
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
fireProducerAdvisory(context, topic, value, info.getConsumerId());
}
@ -118,8 +119,8 @@ public class AdvisoryBroker extends BrokerFilter {
// Replay the consumers.
if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
for (Iterator iter = consumers.values().iterator(); iter.hasNext();) {
ConsumerInfo value = (ConsumerInfo)iter.next();
for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext();) {
ConsumerInfo value = iter.next();
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
fireConsumerAdvisory(context, topic, value, info.getConsumerId());
}
@ -163,7 +164,7 @@ public class AdvisoryBroker extends BrokerFilter {
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
next.removeDestination(context, destination, timeout);
DestinationInfo info = (DestinationInfo)destinations.remove(destination);
DestinationInfo info = destinations.remove(destination);
if (info != null) {
info.setDestination(destination);
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
@ -183,7 +184,7 @@ public class AdvisoryBroker extends BrokerFilter {
public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
next.removeDestinationInfo(context, destInfo);
DestinationInfo info = (DestinationInfo)destinations.remove(destInfo.getDestination());
DestinationInfo info = destinations.remove(destInfo.getDestination());
if (info != null) {
info.setDestination(destInfo.getDestination());

View File

@ -21,7 +21,7 @@ import javax.jms.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
public class AdvisorySupport {
public final class AdvisorySupport {
public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Connection");
@ -45,6 +45,9 @@ public class AdvisorySupport {
public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC + "," + TEMP_TOPIC_ADVISORY_TOPIC);
private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
private AdvisorySupport() {
}
public static ActiveMQTopic getConnectionAdvisoryTopic() {
return CONNECTION_ADVISORY_TOPIC;
}

View File

@ -228,19 +228,19 @@ public class BrokerBroadcaster extends BrokerFilter {
}
public synchronized void addListener(Broker broker) {
List tmp = getListenersAsList();
List<Broker> tmp = getListenersAsList();
tmp.add(broker);
listeners = (Broker[])tmp.toArray(new Broker[tmp.size()]);
listeners = tmp.toArray(new Broker[tmp.size()]);
}
public synchronized void removeListener(Broker broker) {
List tmp = getListenersAsList();
List<Broker> tmp = getListenersAsList();
tmp.remove(broker);
listeners = (Broker[])tmp.toArray(new Broker[tmp.size()]);
listeners = tmp.toArray(new Broker[tmp.size()]);
}
protected List getListenersAsList() {
List tmp = new ArrayList();
protected List<Broker> getListenersAsList() {
List<Broker> tmp = new ArrayList<Broker>();
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
tmp.add(brokers[i]);

View File

@ -29,10 +29,13 @@ import org.apache.activemq.util.IOExceptionSupport;
*
* @version $Revision$
*/
public class BrokerFactory {
public final class BrokerFactory {
private static final FactoryFinder BROKER_FACTORY_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/");
private BrokerFactory() {
}
public static BrokerFactoryHandler createBrokerFactoryHandler(String type) throws IOException {
try {
return (BrokerFactoryHandler)BROKER_FACTORY_HANDLER_FINDER.newInstance(type);

View File

@ -61,7 +61,7 @@ public class BrokerFilter implements Broker {
return next.getAdaptor(type);
}
public Map getDestinationMap() {
public Map<ActiveMQDestination, Destination> getDestinationMap() {
return next.getDestinationMap();
}
@ -205,7 +205,7 @@ public class BrokerFilter implements Broker {
return next.isStopped();
}
public Set getDurableDestinations() {
public Set<ActiveMQDestination> getDurableDestinations() {
return next.getDurableDestinations();
}

View File

@ -18,6 +18,8 @@ package org.apache.activemq.broker;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -30,7 +32,7 @@ public class BrokerRegistry {
private static final BrokerRegistry INSTANCE = new BrokerRegistry();
private final Object mutex = new Object();
private final HashMap<String, BrokerService> brokers = new HashMap<String, BrokerService>();
private final Map<String, BrokerService> brokers = new HashMap<String, BrokerService>();
public static BrokerRegistry getInstance() {
return INSTANCE;

View File

@ -63,7 +63,8 @@ public class EmptyBroker implements Broker {
return null;
}
public Map getDestinationMap() {
@SuppressWarnings("unchecked")
public Map<ActiveMQDestination, Destination> getDestinationMap() {
return Collections.EMPTY_MAP;
}
@ -198,7 +199,7 @@ public class EmptyBroker implements Broker {
return false;
}
public Set getDurableDestinations() {
public Set<ActiveMQDestination> getDurableDestinations() {
return null;
}

View File

@ -55,7 +55,8 @@ public class ErrorBroker implements Broker {
this.message = message;
}
public Map getDestinationMap() {
@SuppressWarnings("unchecked")
public Map<ActiveMQDestination, Destination> getDestinationMap() {
return Collections.EMPTY_MAP;
}
@ -203,7 +204,7 @@ public class ErrorBroker implements Broker {
return true;
}
public Set getDurableDestinations() {
public Set<ActiveMQDestination> getDurableDestinations() {
throw new BrokerStoppedException(this.message);
}

View File

@ -75,7 +75,7 @@ public class MutableBrokerFilter implements Broker {
}
}
public Map getDestinationMap() {
public Map<ActiveMQDestination, Destination> getDestinationMap() {
return getNext().getDestinationMap();
}
@ -215,7 +215,7 @@ public class MutableBrokerFilter implements Broker {
return getNext().isStopped();
}
public Set getDurableDestinations() {
public Set<ActiveMQDestination> getDurableDestinations() {
return getNext().getDurableDestinations();
}

View File

@ -83,6 +83,7 @@ import org.apache.activemq.state.TransactionState;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Transaction;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
@ -100,27 +101,31 @@ import org.apache.commons.logging.LogFactory;
public class TransportConnection implements Service, Connection, Task, CommandVisitor {
private static final Log LOG = LogFactory.getLog(TransportConnection.class);
private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName() + ".Transport");
private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName()
+ ".Transport");
private static final Log SERVICELOG = LogFactory.getLog(TransportConnection.class.getName() + ".Service");
// Keeps track of the broker and connector that created this connection.
protected final Broker broker;
private MasterBroker masterBroker;
protected final TransportConnector connector;
private final Transport transport;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
// Keeps track of the state of the connections.
// protected final ConcurrentHashMap localConnectionStates=new
// ConcurrentHashMap();
protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
// The broker and wireformat info that was exchanged.
protected BrokerInfo brokerInfo;
protected final List<Command> dispatchQueue = Collections.synchronizedList(new LinkedList<Command>());
protected TaskRunner taskRunner;
protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
private MasterBroker masterBroker;
private final Transport transport;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private WireFormatInfo wireFormatInfo;
// Used to do async dispatch.. this should perhaps be pushed down into the
// transport layer..
protected final List<Command> dispatchQueue = Collections.synchronizedList(new LinkedList<Command>());
protected TaskRunner taskRunner;
protected final AtomicReference transportException = new AtomicReference();
private boolean inServiceException = false;
private boolean inServiceException;
private ConnectionStatistics statistics = new ConnectionStatistics();
private boolean manageable;
private boolean slow;
@ -140,7 +145,6 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
private ConnectionContext context;
private boolean networkConnection;
private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
@ -193,7 +197,8 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
* @param taskRunnerFactory - can be null if you want direct dispatch to the
* transport else commands are sent async.
*/
public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, TaskRunnerFactory taskRunnerFactory) {
public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
TaskRunnerFactory taskRunnerFactory) {
this.connector = connector;
this.broker = broker;
RegionBroker rb = (RegionBroker)broker.getAdaptor(RegionBroker.class);
@ -275,7 +280,8 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
if (!disposed.get()) {
if (SERVICELOG.isDebugEnabled()) {
SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection.");
SERVICELOG
.debug("Broker has been stopped. Notifying client and closing his connection.");
}
ConnectionError ce = new ConnectionError();
@ -408,7 +414,8 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
}
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if (transactionState == null) {
throw new IllegalStateException("Cannot prepare a transaction that had not been started: " + info.getTransactionId());
throw new IllegalStateException("Cannot prepare a transaction that had not been started: "
+ info.getTransactionId());
}
// Avoid dups.
if (!transactionState.isPrepared()) {
@ -478,7 +485,8 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
}
public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
public Response processMessageDispatchNotification(MessageDispatchNotification notification)
throws Exception {
broker.processDispatchNotification(notification);
return null;
}
@ -507,7 +515,9 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
throw new IllegalStateException(
"Cannot add a producer to a session that had not been registered: "
+ sessionId);
}
// Avoid replaying dup commands
if (!ss.getProducerIds().contains(info.getProducerId())) {
@ -527,7 +537,9 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " + sessionId);
throw new IllegalStateException(
"Cannot remove a producer from a session that had not been registered: "
+ sessionId);
}
ProducerState ps = ss.removeProducer(id);
if (ps == null) {
@ -544,7 +556,9 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: " + sessionId);
throw new IllegalStateException(
"Cannot add a consumer to a session that had not been registered: "
+ sessionId);
}
// Avoid replaying dup commands
if (!ss.getConsumerIds().contains(info.getConsumerId())) {
@ -564,7 +578,9 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + sessionId);
throw new IllegalStateException(
"Cannot remove a consumer from a session that had not been registered: "
+ sessionId);
}
ConsumerState consumerState = ss.removeConsumer(id);
if (consumerState == null) {
@ -644,7 +660,8 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
if (state.getConnection() != this) {
LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
state.getConnection().stop();
LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: " + state.getConnection().getRemoteAddress());
LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
+ state.getConnection().getRemoteAddress());
state.setConnection(this);
state.reset(info);
}
@ -659,7 +676,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
context.setConnection(this);
context.setBroker(broker);
context.setConnector(connector);
context.setTransactions(new ConcurrentHashMap());
context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
context.setClientId(clientId);
context.setUserName(info.getUserName());
context.setConnectionId(info.getConnectionId());
@ -767,7 +784,8 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
}
protected void processDispatch(Command command) throws IOException {
final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch() ? command : null);
final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch()
? command : null);
try {
if (!disposed.get()) {
if (messageDispatch != null) {
@ -847,7 +865,8 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
transport.start();
if (taskRunnerFactory != null) {
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " + getRemoteAddress());
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
+ getRemoteAddress());
} else {
taskRunner = null;
}
@ -931,8 +950,8 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
// Run the MessageDispatch callbacks so that message references get
// cleaned up.
for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) {
Command command = (Command)iter.next();
for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
Command command = iter.next();
if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch)command;
Runnable sub = md.getTransmitCallback();
@ -1105,18 +1124,19 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
// We have been requested to create a two way pipe ...
try {
Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
Map<String, String> props = new HashMap(properties);
Map<String, String> props = createMap(properties);
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
IntrospectionSupport.setProperties(config, props, "");
config.setBrokerName(broker.getBrokerName());
URI uri = broker.getVmConnectorURI();
HashMap<String,String> map = new HashMap<String,String>(URISupport.parseParamters(uri));
HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParamters(uri));
map.put("network", "true");
map.put("async", "false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
Transport localTransport = TransportFactory.connect(uri);
Transport remoteBridgeTransport = new ResponseCorrelator(transport);
duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport);
duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport,
remoteBridgeTransport);
// now turn duplex off this side
info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true);
@ -1144,6 +1164,11 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null;
}
@SuppressWarnings("unchecked")
private HashMap<String, String> createMap(Properties properties) {
return new HashMap(properties);
}
protected void dispatch(Command command) throws IOException {
try {
setMarkedCandidate(true);
@ -1182,7 +1207,8 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
ProducerState producerState = ss.getProducerState(id);
if (producerState != null && producerState.getInfo() != null) {
ProducerInfo info = producerState.getInfo();
result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
result.setMutable(info.getDestination() == null
|| info.getDestination().isComposite());
}
}
producerExchanges.put(id, result);
@ -1287,7 +1313,8 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
//
// /////////////////////////////////////////////////////////////////
protected TransportConnectionState registerConnectionState(ConnectionId connectionId, TransportConnectionState state) {
protected TransportConnectionState registerConnectionState(ConnectionId connectionId,
TransportConnectionState state) {
TransportConnectionState rc = connectionState;
connectionState = state;
return rc;
@ -1310,7 +1337,9 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
protected TransportConnectionState lookupConnectionState(String connectionId) {
TransportConnectionState cs = connectionState;
if (cs == null) {
throw new IllegalStateException("Cannot lookup a connectionId for a connection that had not been registered: " + connectionId);
throw new IllegalStateException(
"Cannot lookup a connectionId for a connection that had not been registered: "
+ connectionId);
}
return cs;
}
@ -1318,7 +1347,9 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
protected TransportConnectionState lookupConnectionState(ConsumerId id) {
TransportConnectionState cs = connectionState;
if (cs == null) {
throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: " + id.getParentId().getParentId());
throw new IllegalStateException(
"Cannot lookup a consumer from a connection that had not been registered: "
+ id.getParentId().getParentId());
}
return cs;
}
@ -1326,7 +1357,9 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
protected TransportConnectionState lookupConnectionState(ProducerId id) {
TransportConnectionState cs = connectionState;
if (cs == null) {
throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: " + id.getParentId().getParentId());
throw new IllegalStateException(
"Cannot lookup a producer from a connection that had not been registered: "
+ id.getParentId().getParentId());
}
return cs;
}
@ -1334,7 +1367,9 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
protected TransportConnectionState lookupConnectionState(SessionId id) {
TransportConnectionState cs = connectionState;
if (cs == null) {
throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: " + id.getParentId());
throw new IllegalStateException(
"Cannot lookup a session from a connection that had not been registered: "
+ id.getParentId());
}
return cs;
}
@ -1342,7 +1377,8 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
protected TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
TransportConnectionState cs = connectionState;
if (cs == null) {
throw new IllegalStateException("Cannot lookup a connection that had not been registered: " + connectionId);
throw new IllegalStateException("Cannot lookup a connection that had not been registered: "
+ connectionId);
}
return cs;
}

View File

@ -34,7 +34,7 @@ import org.apache.commons.logging.LogFactory;
public class TransportStatusDetector implements Service, Runnable {
private static final Log LOG = LogFactory.getLog(TransportStatusDetector.class);
private TransportConnector connector;
private Set collectionCandidates = new CopyOnWriteArraySet();
private Set<TransportConnection> collectionCandidates = new CopyOnWriteArraySet<TransportConnection>();
private AtomicBoolean started = new AtomicBoolean(false);
private Thread runner;
private int sweepInterval = 5000;
@ -60,8 +60,8 @@ public class TransportStatusDetector implements Service, Runnable {
}
protected void doCollection() {
for (Iterator i = collectionCandidates.iterator(); i.hasNext();) {
TransportConnection tc = (TransportConnection)i.next();
for (Iterator<TransportConnection> i = collectionCandidates.iterator(); i.hasNext();) {
TransportConnection tc = i.next();
if (tc.isMarkedCandidate()) {
if (tc.isBlockedCandidate()) {
collectionCandidates.remove(tc);

View File

@ -69,7 +69,7 @@ public class MasterConnector implements Service, BrokerServiceAware {
private ConnectionInfo connectionInfo;
private SessionInfo sessionInfo;
private ProducerInfo producerInfo;
final AtomicBoolean masterActive = new AtomicBoolean();
private final AtomicBoolean masterActive = new AtomicBoolean();
public MasterConnector() {
}

View File

@ -196,7 +196,7 @@ public class BrokerView implements BrokerViewMBean {
public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception {
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(clientId);
info.setSubcriptionName(subscriberName);
info.setSubscriptionName(subscriberName);
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
context.setClientId(clientId);

View File

@ -128,7 +128,7 @@ public class DestinationView implements DestinationViewMBean {
public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException {
Message[] messages = destination.browse();
ArrayList c = new ArrayList();
ArrayList<CompositeData> c = new ArrayList<CompositeData>();
MessageEvaluationContext ctx = new MessageEvaluationContext();
ctx.setDestination(destination.getActiveMQDestination());
@ -159,7 +159,7 @@ public class DestinationView implements DestinationViewMBean {
/**
* Browses the current destination returning a list of messages
*/
public List browseMessages() throws InvalidSelectorException {
public List<Object> browseMessages() throws InvalidSelectorException {
return browseMessages(null);
}
@ -167,9 +167,9 @@ public class DestinationView implements DestinationViewMBean {
* Browses the current destination with the given selector returning a list
* of messages
*/
public List browseMessages(String selector) throws InvalidSelectorException {
public List<Object> browseMessages(String selector) throws InvalidSelectorException {
Message[] messages = destination.browse();
ArrayList answer = new ArrayList();
ArrayList<Object> answer = new ArrayList<Object>();
MessageEvaluationContext ctx = new MessageEvaluationContext();
ctx.setDestination(destination.getActiveMQDestination());

View File

@ -78,7 +78,7 @@ public class DurableSubscriptionView extends SubscriptionView implements Durable
public void destroy() throws Exception {
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(clientId);
info.setSubcriptionName(subscriptionName);
info.setSubscriptionName(subscriptionName);
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
context.setClientId(clientId);

View File

@ -123,7 +123,7 @@ public class InactiveDurableSubscriptionView extends SubscriptionView implements
public void destroy() throws Exception {
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(clientId);
info.setSubcriptionName(subscriptionInfo.getSubscriptionName());
info.setSubscriptionName(subscriptionInfo.getSubscriptionName());
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
context.setClientId(clientId);

View File

@ -320,7 +320,7 @@ public class ManagedRegionBroker extends RegionBroker {
SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
SubscriptionInfo info = new SubscriptionInfo();
info.setClientId(subscriptionKey.getClientId());
info.setSubcriptionName(subscriptionKey.getSubscriptionName());
info.setSubscriptionName(subscriptionKey.getSubscriptionName());
info.setDestination(new ActiveMQTopic(view.getDestinationName()));
addInactiveSubscription(subscriptionKey, info);
}
@ -420,9 +420,6 @@ public class ManagedRegionBroker extends RegionBroker {
throw new RuntimeException("Should not be called.");
}
public void finished() {
}
public boolean hasSpace() {
return true;
}

View File

@ -243,7 +243,7 @@ public abstract class AbstractRegion implements Region {
// inactive state for the
// destination which has reduced memory usage.
//
DestinationFilter destinationFilter = DestinationFilter.parseFilter(info.getDestination());
DestinationFilter.parseFilter(info.getDestination());
Subscription sub = createSubscription(context, info);

View File

@ -65,7 +65,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
this.broker = broker;
}
public Set getDestinations() {
public Set<ActiveMQDestination> getDestinations() {
return persistenceAdapter.getDestinations();
}

View File

@ -23,10 +23,12 @@ import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.util.SubscriptionKey;
@ -36,8 +38,8 @@ import org.apache.commons.logging.LogFactory;
public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
private final ConcurrentHashMap destinations = new ConcurrentHashMap();
private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final SubscriptionKey subscriptionKey;
private final boolean keepDurableSubsActive;
private final UsageManager usageManager;
@ -82,7 +84,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
this.context = context;
this.info = info;
if (!keepDurableSubsActive) {
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic)iter.next();
topic.activate(context, this);
}
@ -93,7 +95,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
// If nothing was in the persistent store, then try to use the
// recovery policy.
if (pending.isEmpty()) {
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic)iter.next();
topic.recoverRetroactiveMessages(context, this);
}
@ -110,7 +112,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
pending.stop();
}
if (!keepDurableSubsActive) {
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic)iter.next();
topic.deactivate(context, this);
}
@ -118,7 +120,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
// Mark the dispatched messages as redelivered for next time.
MessageReference node = (MessageReference)iter.next();
Integer count = (Integer)redeliveredMessages.get(node.getMessageId());
Integer count = redeliveredMessages.get(node.getMessageId());
if (count != null) {
redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
} else {
@ -152,7 +154,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
MessageDispatch md = super.createMessageDispatch(node, message);
Integer count = (Integer)redeliveredMessages.get(node.getMessageId());
Integer count = redeliveredMessages.get(node.getMessageId());
if (count != null) {
md.setRedeliveryCounter(count.intValue());
}

View File

@ -114,8 +114,8 @@ public class RegionBroker implements Broker {
tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
}
public Map getDestinationMap() {
Map answer = getQueueRegion().getDestinationMap();
public Map<ActiveMQDestination, Destination> getDestinationMap() {
Map<ActiveMQDestination, Destination> answer = getQueueRegion().getDestinationMap();
answer.putAll(getTopicRegion().getDestinationMap());
return answer;
}
@ -174,10 +174,6 @@ public class RegionBroker implements Broker {
return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
private static PersistenceAdapter createDefaultPersistenceAdapter(UsageManager memoryManager) throws IOException {
return new MemoryPersistenceAdapter();
}
public void start() throws Exception {
((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
started = true;
@ -575,7 +571,7 @@ public class RegionBroker implements Broker {
return !started;
}
public Set getDurableDestinations() {
public Set<ActiveMQDestination> getDurableDestinations() {
return destinationFactory.getDestinations();
}

View File

@ -276,10 +276,10 @@ public class TopicRegion extends AbstractRegion {
return !info1.getDestination().equals(info2.getDestination());
}
protected Set getInactiveDestinations() {
Set inactiveDestinations = super.getInactiveDestinations();
for (Iterator iter = inactiveDestinations.iterator(); iter.hasNext();) {
ActiveMQDestination dest = (ActiveMQDestination)iter.next();
protected Set<ActiveMQDestination> getInactiveDestinations() {
Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
ActiveMQDestination dest = iter.next();
if (!dest.isTopic()) {
iter.remove();
}

View File

@ -46,8 +46,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
private Store store;
private String name;
private LinkedList memoryList = new LinkedList();
private ListContainer diskList;
private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>();
private ListContainer<MessageReference> diskList;
private Iterator iter;
private Destination regionDestination;
private boolean iterating;
@ -105,7 +105,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
public synchronized void destroy() {
stop();
for (Iterator i = memoryList.iterator(); i.hasNext();) {
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
Message node = (Message)i.next();
node.decrementReferenceCount();
}
@ -115,15 +115,15 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
}
public synchronized LinkedList pageInList(int maxItems) {
LinkedList result = new LinkedList();
public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
LinkedList<MessageReference> result = new LinkedList<MessageReference>();
int count = 0;
for (Iterator i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
result.add(i.next());
count++;
}
if (count < maxItems && !isDiskListEmpty()) {
for (Iterator i = getDiskList().iterator(); i.hasNext() && count < maxItems;) {
for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext() && count < maxItems;) {
Message message = (Message)i.next();
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
@ -262,7 +262,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
protected synchronized void flushToDisk() {
if (!memoryList.isEmpty()) {
while (!memoryList.isEmpty()) {
MessageReference node = (MessageReference)memoryList.removeFirst();
MessageReference node = memoryList.removeFirst();
node.decrementReferenceCount();
getDiskList().addLast(node);
}
@ -274,7 +274,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
return diskList == null || diskList.isEmpty();
}
protected ListContainer getDiskList() {
protected ListContainer<MessageReference> getDiskList() {
if (diskList == null) {
try {
diskList = store.getListContainer(name, "TopicSubscription", true);

View File

@ -32,7 +32,7 @@ import org.apache.commons.logging.LogFactory;
public class StoreQueueCursor extends AbstractPendingMessageCursor {
private static final Log LOG = LogFactory.getLog(StoreQueueCursor.class);
private int pendingCount = 0;
private int pendingCount;
private Queue queue;
private Store tmpStore;
private PendingMessageCursor nonPersistent;

View File

@ -30,25 +30,25 @@ import org.apache.activemq.command.ConsumerId;
* @version $Revision$
*/
public class SimpleMessageGroupMap implements MessageGroupMap {
private Map map = new ConcurrentHashMap();
private Map<String, ConsumerId> map = new ConcurrentHashMap<String, ConsumerId>();
public void put(String groupId, ConsumerId consumerId) {
map.put(groupId, consumerId);
}
public ConsumerId get(String groupId) {
return (ConsumerId) map.get(groupId);
return map.get(groupId);
}
public ConsumerId removeGroup(String groupId) {
return (ConsumerId) map.remove(groupId);
return map.remove(groupId);
}
public MessageGroupSet removeConsumer(ConsumerId consumerId) {
SimpleMessageGroupSet ownedGroups = new SimpleMessageGroupSet();
for (Iterator iter = map.keySet().iterator(); iter.hasNext();) {
String group = (String) iter.next();
ConsumerId owner = (ConsumerId) map.get(group);
for (Iterator<String> iter = map.keySet().iterator(); iter.hasNext();) {
String group = iter.next();
ConsumerId owner = map.get(group);
if (owner.equals(consumerId)) {
ownedGroups.add(group);
iter.remove();

View File

@ -26,7 +26,7 @@ import java.util.Set;
*/
public class SimpleMessageGroupSet implements MessageGroupSet {
private Set set = new HashSet();
private Set<String> set = new HashSet<String>();
public boolean contains(String groupID) {
return set.contains(groupID);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.region.policy;
import java.util.List;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.filter.MessageEvaluationContext;
/**
@ -43,6 +44,6 @@ public interface DispatchPolicy {
*
* @return true if at least one consumer was dispatched or false if there are no active subscriptions that could be dispatched
*/
boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception;
boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers) throws Exception;
}

View File

@ -57,12 +57,12 @@ public class LastImageSubscriptionRecoveryPolicy implements SubscriptionRecovery
}
public Message[] browse(ActiveMQDestination destination) throws Exception {
List result = new ArrayList();
List<Message> result = new ArrayList<Message>();
DestinationFilter filter = DestinationFilter.parseFilter(destination);
if (filter.matches(lastImage.getMessage().getDestination())) {
result.add(lastImage.getMessage());
}
return (Message[])result.toArray(new Message[result.size()]);
return result.toArray(new Message[result.size()]);
}
public SubscriptionRecoveryPolicy copy() {

View File

@ -43,7 +43,7 @@ public class RoundRobinDispatchPolicy implements DispatchPolicy {
* @see org.apache.activemq.broker.region.policy.DispatchPolicy#dispatch(org.apache.activemq.broker.region.MessageReference,
* org.apache.activemq.filter.MessageEvaluationContext, java.util.List)
*/
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers) throws Exception {
// Big synch here so that only 1 message gets dispatched at a time.
// Ensures
@ -54,8 +54,8 @@ public class RoundRobinDispatchPolicy implements DispatchPolicy {
int count = 0;
Subscription firstMatchingConsumer = null;
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription)iter.next();
for (Iterator<Subscription> iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = iter.next();
// Only dispatch to interested subscriptions
if (!sub.matches(node, msgContext)) {

View File

@ -45,7 +45,7 @@ public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPoli
// TODO: need to get a better synchronized linked list that has little
// contention between enqueuing and dequeuing
private final List buffer = Collections.synchronizedList(new LinkedList());
private final List<TimestampWrapper> buffer = Collections.synchronizedList(new LinkedList<TimestampWrapper>());
private volatile long lastGCRun = System.currentTimeMillis();
private long recoverDuration = 60 * 1000; // Buffer for 1 min.
@ -79,11 +79,10 @@ public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPoli
public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception {
// Re-dispatch the messages from the buffer.
ArrayList copy = new ArrayList(buffer);
ArrayList<TimestampWrapper> copy = new ArrayList<TimestampWrapper>(buffer);
if (!copy.isEmpty()) {
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
for (Iterator iter = copy.iterator(); iter.hasNext();) {
TimestampWrapper timestampWrapper = (TimestampWrapper)iter.next();
for (Iterator<TimestampWrapper> iter = copy.iterator(); iter.hasNext();) {
TimestampWrapper timestampWrapper = iter.next();
MessageReference message = timestampWrapper.message;
sub.addRecoveredMessage(context, message);
}
@ -101,7 +100,7 @@ public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPoli
public void gc() {
lastGCRun = System.currentTimeMillis();
while (buffer.size() > 0) {
TimestampWrapper timestampWrapper = (TimestampWrapper)buffer.get(0);
TimestampWrapper timestampWrapper = buffer.get(0);
if (lastGCRun > timestampWrapper.timestamp + recoverDuration) {
// GC it.
buffer.remove(0);
@ -120,18 +119,18 @@ public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPoli
}
public Message[] browse(ActiveMQDestination destination) throws Exception {
List result = new ArrayList();
ArrayList copy = new ArrayList(buffer);
List<Message> result = new ArrayList<Message>();
ArrayList<TimestampWrapper> copy = new ArrayList<TimestampWrapper>(buffer);
DestinationFilter filter = DestinationFilter.parseFilter(destination);
for (Iterator iter = copy.iterator(); iter.hasNext();) {
TimestampWrapper timestampWrapper = (TimestampWrapper)iter.next();
for (Iterator<TimestampWrapper> iter = copy.iterator(); iter.hasNext();) {
TimestampWrapper timestampWrapper = iter.next();
MessageReference ref = timestampWrapper.message;
Message message = ref.getMessage();
if (filter.matches(message.getDestination())) {
result.add(message);
}
}
return (Message[])result.toArray(new Message[result.size()]);
return result.toArray(new Message[result.size()]);
}
}

View File

@ -58,7 +58,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
// until we have some MBeans for producers, lets do it all ourselves
private Map<ProducerId, ProducerInfo> producers = new HashMap<ProducerId, ProducerInfo>();
private Map<ProducerId, Set> producerDestinations = new HashMap<ProducerId, Set>();
private Map<ProducerId, Set<ActiveMQDestination>> producerDestinations = new HashMap<ProducerId, Set<ActiveMQDestination>>();
private Object lock = new Object();
public ConnectionDotFileInterceptor(Broker next, String file, boolean redrawOnRemove) throws MalformedObjectNameException {
@ -267,7 +267,8 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
*/
protected String asID(String name) {
StringBuffer buffer = new StringBuffer();
for (int i = 0, size = name.length(); i < size; i++) {
int size = name.length();
for (int i = 0; i < size; i++) {
char ch = name.charAt(i);
if (Character.isLetterOrDigit(ch) || ch == '_') {
buffer.append(ch);

View File

@ -324,9 +324,10 @@ public abstract class ActiveMQDestination extends JNDIBaseStorable implements Da
out.writeObject(options);
}
@SuppressWarnings("unchecked")
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.setPhysicalName(in.readUTF());
this.options = (Map)in.readObject();
this.options = (Map<String, String>)in.readObject();
}
public String getDestinationTypeAsString() {

View File

@ -29,12 +29,12 @@ public class ControlCommand extends BaseCommand {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONTROL_COMMAND;
private String command;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
private String command;
/**
* @openwire:property version=1
*/

View File

@ -26,10 +26,6 @@ public class DiscoveryEvent implements DataStructure {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.DISCOVERY_EVENT;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
protected String serviceName;
protected String brokerName;
@ -40,6 +36,10 @@ public class DiscoveryEvent implements DataStructure {
this.serviceName = serviceName;
}
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
/**
* @openwire:property version=1
*/

View File

@ -35,10 +35,6 @@ public class JournalTransaction implements DataStructure {
public boolean wasPrepared;
public TransactionId transactionId;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
public JournalTransaction(byte type, TransactionId transactionId, boolean wasPrepared) {
this.type = type;
this.transactionId = transactionId;
@ -48,6 +44,10 @@ public class JournalTransaction implements DataStructure {
public JournalTransaction() {
}
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
/**
* @openwire:property version=1
*/

View File

@ -32,10 +32,6 @@ public class RemoveInfo extends BaseCommand {
protected DataStructure objectId;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
public RemoveInfo() {
}
@ -43,6 +39,10 @@ public class RemoveInfo extends BaseCommand {
this.objectId = objectId;
}
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
/**
* @openwire:property version=1 cache=true
*/

View File

@ -44,6 +44,10 @@ public class ConstantExpression implements Expression {
private Object value;
public ConstantExpression(Object value) {
this.value = value;
}
public static ConstantExpression createFromDecimal(String text) {
// Strip off the 'l' or 'L' if needed.
@ -89,10 +93,6 @@ public class ConstantExpression implements Expression {
return new ConstantExpression(value);
}
public ConstantExpression(Object value) {
this.value = value;
}
public Object evaluate(MessageEvaluationContext message) throws JMSException {
return value;
}

View File

@ -164,7 +164,8 @@ public class DestinationMapNode implements DestinationNode {
public void removeAll(Set answer, String[] paths, int startIndex) {
DestinationNode node = this;
for (int i = startIndex, size = paths.length; i < size && node != null; i++) {
int size = paths.length;
for (int i = startIndex; i < size && node != null; i++) {
String path = paths[i];
if (path.equals(ANY_DESCENDENT)) {
@ -230,7 +231,8 @@ public class DestinationMapNode implements DestinationNode {
public void appendMatchingValues(Set answer, String[] paths, int startIndex) {
DestinationNode node = this;
boolean couldMatchAny = true;
for (int i = startIndex, size = paths.length; i < size && node != null; i++) {
int size = paths.length;
for (int i = startIndex; i < size && node != null; i++) {
String path = paths[i];
if (path.equals(ANY_DESCENDENT)) {
answer.addAll(node.getDesendentValues());

View File

@ -30,9 +30,12 @@ import org.apache.activemq.command.Message;
*
* @version $Revision: 1.3 $
*/
public class DestinationPath {
public final class DestinationPath {
protected static final char SEPARATOR = '.';
private DestinationPath() {
}
public static String[] getDestinationPaths(String subject) {
List<String> list = new ArrayList<String>();
int previous = 0;

View File

@ -42,7 +42,8 @@ public class PrefixDestinationFilter extends DestinationFilter {
String[] path = DestinationPath.getDestinationPaths(destination.getPhysicalName());
int length = prefixes.length;
if (path.length >= length) {
for (int i = 0, size = length - 1; i < size; i++) {
int size = length - 1;
for (int i = 0; i < size; i++) {
if (!prefixes[i].equals(path[i])) {
return false;
}

View File

@ -48,7 +48,7 @@ public class WildcardDestinationFilter extends DestinationFilter {
String[] path = DestinationPath.getDestinationPaths(destination);
int length = prefixes.length;
if (path.length == length) {
for (int i = 0, size = length; i < size; i++) {
for (int i = 0; i < length; i++) {
String prefix = prefixes[i];
if (prefix != null && !prefix.equals(path[i])) {
return false;

View File

@ -55,7 +55,7 @@ public class ActiveMQInitialContextFactory implements InitialContextFactory {
public Context getInitialContext(Hashtable environment) throws NamingException {
// lets create a factory
Map data = new ConcurrentHashMap();
Map<String, Object> data = new ConcurrentHashMap<String, Object>();
String[] names = getConnectionFactoryNames(environment);
for (int i = 0; i < names.length; i++) {
ActiveMQConnectionFactory factory = null;
@ -120,7 +120,7 @@ public class ActiveMQInitialContextFactory implements InitialContextFactory {
// Implementation methods
// -------------------------------------------------------------------------
protected ReadOnlyContext createContext(Hashtable environment, Map data) {
protected ReadOnlyContext createContext(Hashtable environment, Map<String, Object> data) {
return new ReadOnlyContext(environment, data);
}
@ -143,7 +143,7 @@ public class ActiveMQInitialContextFactory implements InitialContextFactory {
protected String[] getConnectionFactoryNames(Map environment) {
String factoryNames = (String)environment.get("connectionFactoryNames");
if (factoryNames != null) {
List list = new ArrayList();
List<String> list = new ArrayList<String>();
for (StringTokenizer enumeration = new StringTokenizer(factoryNames, ","); enumeration.hasMoreTokens();) {
list.add(enumeration.nextToken().trim());
}
@ -157,7 +157,7 @@ public class ActiveMQInitialContextFactory implements InitialContextFactory {
return DEFAULT_CONNECTION_FACTORY_NAMES;
}
protected void createQueues(Map data, Hashtable environment) {
protected void createQueues(Map<String, Object> data, Hashtable environment) {
for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry)iter.next();
String key = entry.getKey().toString();
@ -168,7 +168,7 @@ public class ActiveMQInitialContextFactory implements InitialContextFactory {
}
}
protected void createTopics(Map data, Hashtable environment) {
protected void createTopics(Map<String, Object> data, Hashtable environment) {
for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry)iter.next();
String key = entry.getKey().toString();

View File

@ -59,6 +59,7 @@ import javax.naming.spi.NamingManager;
*
* @version $Revision: 1.2 $ $Date: 2005/08/27 03:52:39 $
*/
@SuppressWarnings("unchecked")
public class ReadOnlyContext implements Context, Serializable {
public static final String SEPARATOR = "/";

View File

@ -25,7 +25,10 @@ import org.apache.activemq.kaha.impl.KahaStore;
*
* @version $Revision: 1.2 $
*/
public class StoreFactory {
public final class StoreFactory {
private StoreFactory() {
}
/**
* open or create a Store

View File

@ -53,8 +53,12 @@ import org.apache.commons.logging.LogFactory;
public class KahaStore implements Store {
private static final String PROPERTY_PREFIX = "org.apache.activemq.kaha.Store";
private static final boolean BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX + ".FileLockBroken", "false"));
private static final boolean DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX + ".DisableLocking", "false"));
private static final boolean BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
+ ".FileLockBroken",
"false"));
private static final boolean DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
+ ".DisableLocking",
"false"));
private static final Log LOG = LogFactory.getLog(KahaStore.class);
@ -73,7 +77,7 @@ public class KahaStore implements Store {
private boolean useAsyncDataManager;
private long maxDataFileLength = 1024 * 1024 * 32;
private FileLock lock;
private boolean persistentIndex=true;
private boolean persistentIndex = true;
private RandomAccessFile lockFile;
public KahaStore(String name, String mode) throws IOException {
@ -196,7 +200,8 @@ public class KahaStore implements Store {
return getMapContainer(id, containerName, persistentIndex);
}
public synchronized MapContainer getMapContainer(Object id, String containerName, boolean persistentIndex) throws IOException {
public synchronized MapContainer getMapContainer(Object id, String containerName, boolean persistentIndex)
throws IOException {
initialize();
ContainerId containerId = new ContainerId();
containerId.setKey(id);
@ -264,7 +269,8 @@ public class KahaStore implements Store {
return getListContainer(id, containerName, persistentIndex);
}
public synchronized ListContainer getListContainer(Object id, String containerName, boolean persistentIndex) throws IOException {
public synchronized ListContainer getListContainer(Object id, String containerName,
boolean persistentIndex) throws IOException {
initialize();
ContainerId containerId = new ContainerId();
containerId.setKey(id);
@ -461,13 +467,15 @@ public class KahaStore implements Store {
if (!BROKEN_FILE_LOCK) {
lock = lockFile.getChannel().tryLock();
if (lock == null) {
throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by another application");
throw new StoreLockedExcpetion("Kaha Store " + directory.getName()
+ " is already opened by another application");
} else {
System.setProperty(key, new Date().toString());
}
}
} else { // already locked
throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by this application.");
throw new StoreLockedExcpetion("Kaha Store " + directory.getName()
+ " is already opened by this application.");
}
}
}

View File

@ -159,7 +159,7 @@ public abstract class BaseContainerImpl {
protected final void doClear() {
checkClosed();
loaded = true;
List indexList = new ArrayList();
List<IndexItem> indexList = new ArrayList<IndexItem>();
try {
init();
long nextItem = root.getNextItem();
@ -172,7 +172,7 @@ public abstract class BaseContainerImpl {
root.setNextItem(Item.POSITION_NOT_SET);
storeIndex(root);
for (int i = 0; i < indexList.size(); i++) {
IndexItem item = (IndexItem)indexList.get(i);
IndexItem item = indexList.get(i);
dataManager.removeInterestInFile(item.getKeyFile());
dataManager.removeInterestInFile(item.getValueFile());
indexManager.freeIndex(item);

View File

@ -71,7 +71,7 @@ public class ContainerEntrySet extends ContainerCollectionSupport implements Set
}
public boolean retainAll(Collection c) {
List tmpList = new ArrayList();
List<Object> tmpList = new ArrayList<Object>();
for (Iterator i = c.iterator(); i.hasNext();) {
Object o = i.next();
if (!contains(o)) {
@ -79,7 +79,7 @@ public class ContainerEntrySet extends ContainerCollectionSupport implements Set
}
}
boolean result = false;
for (Iterator i = tmpList.iterator(); i.hasNext();) {
for (Iterator<Object> i = tmpList.iterator(); i.hasNext();) {
result |= remove(i.next());
}
return result;
@ -99,8 +99,8 @@ public class ContainerEntrySet extends ContainerCollectionSupport implements Set
container.clear();
}
protected Set buildEntrySet() {
Set set = new HashSet();
protected Set<ContainerMapEntry> buildEntrySet() {
Set<ContainerMapEntry> set = new HashSet<ContainerMapEntry>();
for (Iterator i = container.keySet().iterator(); i.hasNext();) {
ContainerMapEntry entry = new ContainerMapEntry(container, i.next());
set.add(entry);

View File

@ -44,7 +44,7 @@ public class ContainerKeySet extends ContainerCollectionSupport implements Set {
}
public Object[] toArray() {
List list = new ArrayList();
List<Object> list = new ArrayList<Object>();
IndexItem item = container.getInternalList().getRoot();
while ((item = container.getInternalList().getNextEntry(item)) != null) {
list.add(container.getKey(item));
@ -53,7 +53,7 @@ public class ContainerKeySet extends ContainerCollectionSupport implements Set {
}
public Object[] toArray(Object[] a) {
List list = new ArrayList();
List<Object> list = new ArrayList<Object>();
IndexItem item = container.getInternalList().getRoot();
while ((item = container.getInternalList().getNextEntry(item)) != null) {
list.add(container.getKey(item));
@ -83,14 +83,14 @@ public class ContainerKeySet extends ContainerCollectionSupport implements Set {
}
public boolean retainAll(Collection c) {
List tmpList = new ArrayList();
List<Object> tmpList = new ArrayList<Object>();
for (Iterator i = c.iterator(); i.hasNext();) {
Object o = i.next();
if (!contains(o)) {
tmpList.add(o);
}
}
for (Iterator i = tmpList.iterator(); i.hasNext();) {
for (Iterator<Object> i = tmpList.iterator(); i.hasNext();) {
remove(i.next());
}
return !tmpList.isEmpty();

View File

@ -112,14 +112,14 @@ class ContainerValueCollection extends ContainerCollectionSupport implements Col
}
public boolean retainAll(Collection c) {
List tmpList = new ArrayList();
List<Object> tmpList = new ArrayList<Object>();
for (Iterator i = c.iterator(); i.hasNext();) {
Object o = i.next();
if (!contains(o)) {
tmpList.add(o);
}
}
for (Iterator i = tmpList.iterator(); i.hasNext();) {
for (Iterator<Object> i = tmpList.iterator(); i.hasNext();) {
remove(i.next());
}
return !tmpList.isEmpty();

View File

@ -246,7 +246,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*/
public synchronized Object[] toArray() {
load();
List tmp = new ArrayList(indexList.size());
List<Object> tmp = new ArrayList<Object>(indexList.size());
IndexItem next = indexList.getFirst();
while (next != null) {
Object value = getValue(next);
@ -263,7 +263,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*/
public synchronized Object[] toArray(Object[] a) {
load();
List tmp = new ArrayList(indexList.size());
List<Object> tmp = new ArrayList<Object>(indexList.size());
IndexItem next = indexList.getFirst();
while (next != null) {
Object value = getValue(next);
@ -384,7 +384,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*/
public synchronized boolean retainAll(Collection c) {
load();
List tmpList = new ArrayList();
List<Object> tmpList = new ArrayList<Object>();
IndexItem next = indexList.getFirst();
while (next != null) {
Object o = getValue(next);
@ -393,7 +393,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
}
next = indexList.getNextEntry(next);
}
for (Iterator i = tmpList.iterator(); i.hasNext();) {
for (Iterator<Object> i = tmpList.iterator(); i.hasNext();) {
remove(i.next());
}
return !tmpList.isEmpty();
@ -619,9 +619,9 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#subList(int, int)
*/
public synchronized List subList(int fromIndex, int toIndex) {
public synchronized List<Object> subList(int fromIndex, int toIndex) {
load();
List result = new ArrayList();
List<Object> result = new ArrayList<Object>();
int count = fromIndex;
IndexItem next = indexList.get(fromIndex);
while (next != null && count++ < toIndex) {

View File

@ -251,15 +251,15 @@ class HashBin {
return result;
}
private int getInsertPageNo(int index) {
int result = index / maximumEntries;
return result;
}
private int getOffset(int index) {
int result = index % maximumEntries;
return result;
}
// private int getInsertPageNo(int index) {
// int result = index / maximumEntries;
// return result;
// }
//
// private int getOffset(int index) {
// int result = index % maximumEntries;
// return result;
// }
private void doOverFlow(int index) throws IOException {
int pageNo = index / maximumEntries;

View File

@ -34,8 +34,9 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.1.1.1 $
*/
class TreePage {
private static final transient Log LOG = LogFactory.getLog(TreePage.class);
static final int PAGE_HEADER_SIZE = 18;
private static final transient Log LOG = LogFactory.getLog(TreePage.class);
static enum Flavour {
LESS, MORE
@ -193,7 +194,8 @@ class TreePage {
void setParentId(long newId) throws IOException {
if (newId == this.id) {
throw new IllegalStateException("Cannot set page as a child of itself " + this + " trying to set parentId = " + newId);
throw new IllegalStateException("Cannot set page as a child of itself " + this
+ " trying to set parentId = " + newId);
}
this.parentId = newId;
tree.writePage(this);
@ -362,7 +364,7 @@ class TreePage {
newRoot.setLeaf(false);
this.setParentId(newRoot.getId());
save(); // we are no longer root - need to save - we maybe
// looked up v. soon!
// looked up v. soon!
TreePage rightPage = tree.createPage(newRoot.getId());
rightPage.setEntries(subList);
rightPage.checkLeaf();
@ -464,8 +466,8 @@ class TreePage {
page.setLeaf(true);
}
insertTreeEntry(index, copy);
TreePage landed = null;// if we overflow - the page the
// replacement ends up on
// if we overflow - the page the replacement ends up on
TreePage landed = null;
TreeEntry removed = null;
if (isOverflowed()) {
TreePage parent = getParent();
@ -563,7 +565,8 @@ class TreePage {
setLeaf(!result);
}
private void checkParentIdForRemovedPageEntry(TreeEntry entry, long oldPageId, long newPageId) throws IOException {
private void checkParentIdForRemovedPageEntry(TreeEntry entry, long oldPageId, long newPageId)
throws IOException {
TreePage page = tree.lookupPage(entry.getPrevPageId());
if (page != null && page.getParentId() == oldPageId) {
page.setParentId(newPageId);

View File

@ -50,7 +50,8 @@ public class JMSConnectionStatsImpl extends StatsImpl {
public void reset() {
super.reset();
JMSSessionStatsImpl[] stats = getSessions();
for (int i = 0, size = stats.length; i < size; i++) {
int size = stats.length;
for (int i = 0; i < size; i++) {
stats[i].reset();
}
}
@ -61,7 +62,8 @@ public class JMSConnectionStatsImpl extends StatsImpl {
public void setEnabled(boolean enabled) {
super.setEnabled(enabled);
JMSSessionStatsImpl[] stats = getSessions();
for (int i = 0, size = stats.length; i < size; i++) {
int size = stats.length;
for (int i = 0; i < size; i++) {
stats[i].setEnabled(enabled);
}

View File

@ -44,10 +44,14 @@ public class JMSSessionStatsImpl extends StatsImpl {
this.messageCount = new CountStatisticImpl("messageCount", "Number of messages exchanged");
this.pendingMessageCount = new CountStatisticImpl("pendingMessageCount", "Number of pending messages");
this.expiredMessageCount = new CountStatisticImpl("expiredMessageCount", "Number of expired messages");
this.messageWaitTime = new TimeStatisticImpl("messageWaitTime", "Time spent by a message before being delivered");
this.durableSubscriptionCount = new CountStatisticImpl("durableSubscriptionCount", "The number of durable subscriptions");
this.messageWaitTime = new TimeStatisticImpl("messageWaitTime", "Time spent by a message before being delivered");
this.messageRateTime = new TimeStatisticImpl("messageRateTime", "Time taken to process a message (thoughtput rate)");
this.messageWaitTime = new TimeStatisticImpl("messageWaitTime",
"Time spent by a message before being delivered");
this.durableSubscriptionCount = new CountStatisticImpl("durableSubscriptionCount",
"The number of durable subscriptions");
this.messageWaitTime = new TimeStatisticImpl("messageWaitTime",
"Time spent by a message before being delivered");
this.messageRateTime = new TimeStatisticImpl("messageRateTime",
"Time taken to process a message (thoughtput rate)");
// lets add named stats
addStatistic("messageCount", messageCount);
@ -85,11 +89,13 @@ public class JMSSessionStatsImpl extends StatsImpl {
public void reset() {
super.reset();
JMSConsumerStatsImpl[] cstats = getConsumers();
for (int i = 0, size = cstats.length; i < size; i++) {
int size = cstats.length;
for (int i = 0; i < size; i++) {
cstats[i].reset();
}
JMSProducerStatsImpl[] pstats = getProducers();
for (int i = 0, size = pstats.length; i < size; i++) {
size = pstats.length;
for (int i = 0; i < size; i++) {
pstats[i].reset();
}
}
@ -100,11 +106,13 @@ public class JMSSessionStatsImpl extends StatsImpl {
public void setEnabled(boolean enabled) {
super.setEnabled(enabled);
JMSConsumerStatsImpl[] cstats = getConsumers();
for (int i = 0, size = cstats.length; i < size; i++) {
int size = cstats.length;
for (int i = 0; i < size; i++) {
cstats[i].setEnabled(enabled);
}
JMSProducerStatsImpl[] pstats = getProducers();
for (int i = 0, size = pstats.length; i < size; i++) {
size = pstats.length;
for (int i = 0; i < size; i++) {
pstats[i].setEnabled(enabled);
}

View File

@ -28,7 +28,7 @@ import org.apache.activemq.util.IndentPrinter;
* @version $Revision: 1.2 $
*/
public class JMSStatsImpl extends StatsImpl {
private List connections = new CopyOnWriteArrayList();
private List<ActiveMQConnection> connections = new CopyOnWriteArrayList<ActiveMQConnection>();
public JMSStatsImpl() {
}
@ -73,7 +73,8 @@ public class JMSStatsImpl extends StatsImpl {
public void setEnabled(boolean enabled) {
super.setEnabled(enabled);
JMSConnectionStatsImpl[] stats = getConnections();
for (int i = 0, size = stats.length; i < size; i++) {
int size = stats.length;
for (int i = 0; i < size; i++) {
stats[i].setEnabled(enabled);
}

View File

@ -24,12 +24,14 @@ import javax.management.j2ee.statistics.Statistic;
* @version $Revision: 1.2 $
*/
public class StatisticImpl implements Statistic, Resettable {
protected boolean enabled;
private String name;
private String unit;
private String description;
private long startTime;
private long lastSampleTime;
protected boolean enabled;
public StatisticImpl(String name, String unit, String description) {
this.name = name;

View File

@ -29,7 +29,7 @@ public class StatsImpl extends StatisticImpl implements Stats, Resettable {
private Map<String, StatisticImpl> map;
public StatsImpl() {
this(new HashMap());
this(new HashMap<String, StatisticImpl>());
}
public StatsImpl(Map<String, StatisticImpl> map) {
@ -39,7 +39,8 @@ public class StatsImpl extends StatisticImpl implements Stats, Resettable {
public void reset() {
Statistic[] stats = getStatistics();
for (int i = 0, size = stats.length; i < size; i++) {
int size = stats.length;
for (int i = 0; i < size; i++) {
Statistic stat = stats[i];
if (stat instanceof Resettable) {
Resettable r = (Resettable)stat;

View File

@ -25,10 +25,10 @@ import java.util.Map;
* @version $Revision:$
*/
public class LRUMap extends LinkedHashMap {
private static final long serialVersionUID = -9179676638408888162L;
protected static final float DEFAULT_LOAD_FACTOR = (float) 0.75;
protected static final int DEFAULT_INITIAL_CAPACITY = 5000;
private static final long serialVersionUID = -9179676638408888162L;
private int maximumSize;

View File

@ -31,7 +31,7 @@ public class MapCache implements Cache {
protected final Map<Object, Object> map;
public MapCache() {
this(new ConcurrentHashMap());
this(new ConcurrentHashMap<Object, Object>());
}
public MapCache(Map<Object, Object> map) {

View File

@ -34,7 +34,7 @@ import org.apache.activemq.command.Message;
public class MessageQueue {
private MessageBuffer buffer;
private LinkedList list = new LinkedList();
private LinkedList<MessageReference> list = new LinkedList<MessageReference>();
private int size;
private Object lock = new Object();
private int position;
@ -81,15 +81,15 @@ public class MessageQueue {
/**
* Returns a copy of the list
*/
public List getList() {
public List<MessageReference> getList() {
synchronized (lock) {
return new ArrayList(list);
return new ArrayList<MessageReference>(list);
}
}
public void appendMessages(List answer) {
public void appendMessages(List<MessageReference> answer) {
synchronized (lock) {
for (Iterator iter = list.iterator(); iter.hasNext();) {
for (Iterator<MessageReference> iter = list.iterator(); iter.hasNext();) {
answer.add(iter.next());
}
}

View File

@ -28,7 +28,7 @@ import java.util.LinkedList;
public class OrderBasedMessageBuffer implements MessageBuffer {
private int limit = 100 * 64 * 1024;
private LinkedList list = new LinkedList();
private LinkedList<MessageQueue> list = new LinkedList<MessageQueue>();
private int size;
private Object lock = new Object();
@ -63,7 +63,7 @@ public class OrderBasedMessageBuffer implements MessageBuffer {
list.addLast(queue);
size += delta;
while (size > limit) {
MessageQueue biggest = (MessageQueue) list.removeFirst();
MessageQueue biggest = list.removeFirst();
size -= biggest.evictMessage();
}
}
@ -71,8 +71,8 @@ public class OrderBasedMessageBuffer implements MessageBuffer {
public void clear() {
synchronized (lock) {
for (Iterator iter = list.iterator(); iter.hasNext();) {
MessageQueue queue = (MessageQueue) iter.next();
for (Iterator<MessageQueue> iter = list.iterator(); iter.hasNext();) {
MessageQueue queue = iter.next();
queue.clear();
}
size = 0;

View File

@ -28,7 +28,7 @@ import java.util.List;
public class SizeBasedMessageBuffer implements MessageBuffer {
private int limit = 100 * 64 * 1024;
private List bubbleList = new ArrayList();
private List<MessageQueue> bubbleList = new ArrayList<MessageQueue>();
private int size;
private Object lock = new Object();
@ -69,7 +69,7 @@ public class SizeBasedMessageBuffer implements MessageBuffer {
size += delta;
while (size > limit) {
MessageQueue biggest = (MessageQueue)bubbleList.get(0);
MessageQueue biggest = bubbleList.get(0);
size -= biggest.evictMessage();
bubbleDown(biggest, 0);
@ -79,8 +79,8 @@ public class SizeBasedMessageBuffer implements MessageBuffer {
public void clear() {
synchronized (lock) {
for (Iterator iter = bubbleList.iterator(); iter.hasNext();) {
MessageQueue queue = (MessageQueue)iter.next();
for (Iterator<MessageQueue> iter = bubbleList.iterator(); iter.hasNext();) {
MessageQueue queue = iter.next();
queue.clear();
}
size = 0;
@ -91,7 +91,7 @@ public class SizeBasedMessageBuffer implements MessageBuffer {
// lets bubble up to head of queueif we need to
int position = queue.getPosition();
while (--position >= 0) {
MessageQueue pivot = (MessageQueue)bubbleList.get(position);
MessageQueue pivot = bubbleList.get(position);
if (pivot.getSize() < queueSize) {
swap(position, pivot, position + 1, queue);
} else {
@ -102,8 +102,9 @@ public class SizeBasedMessageBuffer implements MessageBuffer {
protected void bubbleDown(MessageQueue biggest, int position) {
int queueSize = biggest.getSize();
for (int second = position + 1, end = bubbleList.size(); second < end; second++) {
MessageQueue pivot = (MessageQueue)bubbleList.get(second);
int end = bubbleList.size();
for (int second = position + 1; second < end; second++) {
MessageQueue pivot = bubbleList.get(second);
if (pivot.getSize() > queueSize) {
swap(position, biggest, second, pivot);
} else {

View File

@ -69,16 +69,16 @@ public class DestinationBasedMessageList implements MessageList {
queue.add(node);
}
public List getMessages(Subscription sub) {
public List<MessageReference> getMessages(Subscription sub) {
return getMessages(sub.getConsumerInfo().getDestination());
}
public List getMessages(ActiveMQDestination destination) {
public List<MessageReference> getMessages(ActiveMQDestination destination) {
Set set = null;
synchronized (lock) {
set = subscriptionIndex.get(destination);
}
List answer = new ArrayList();
List<MessageReference> answer = new ArrayList<MessageReference>();
for (Iterator iter = set.iterator(); iter.hasNext();) {
MessageQueue queue = (MessageQueue) iter.next();
queue.appendMessages(answer);
@ -87,8 +87,8 @@ public class DestinationBasedMessageList implements MessageList {
}
public Message[] browse(ActiveMQDestination destination) {
List result = getMessages(destination);
return (Message[])result.toArray(new Message[result.size()]);
List<MessageReference> result = getMessages(destination);
return result.toArray(new Message[result.size()]);
}

View File

@ -38,7 +38,7 @@ import org.apache.commons.logging.LogFactory;
*/
public class SimpleMessageList implements MessageList {
private static final Log LOG = LogFactory.getLog(SimpleMessageList.class);
private LinkedList list = new LinkedList();
private LinkedList<MessageReference> list = new LinkedList<MessageReference>();
private int maximumSize = 100 * 64 * 1024;
private int size;
private Object lock = new Object();
@ -56,22 +56,22 @@ public class SimpleMessageList implements MessageList {
list.add(node);
size += delta;
while (size > maximumSize) {
MessageReference evicted = (MessageReference)list.removeFirst();
MessageReference evicted = list.removeFirst();
size -= evicted.getMessageHardRef().getSize();
}
}
}
public List getMessages(ActiveMQDestination destination) {
public List<MessageReference> getMessages(ActiveMQDestination destination) {
return getList();
}
public Message[] browse(ActiveMQDestination destination) {
List result = new ArrayList();
List<Message> result = new ArrayList<Message>();
DestinationFilter filter = DestinationFilter.parseFilter(destination);
synchronized (lock) {
for (Iterator i = list.iterator(); i.hasNext();) {
MessageReference ref = (MessageReference)i.next();
for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
MessageReference ref = i.next();
Message msg;
try {
msg = ref.getMessage();
@ -84,15 +84,15 @@ public class SimpleMessageList implements MessageList {
}
}
return (Message[])result.toArray(new Message[result.size()]);
return result.toArray(new Message[result.size()]);
}
/**
* Returns a copy of the list
*/
public List getList() {
public List<MessageReference> getList() {
synchronized (lock) {
return new ArrayList(list);
return new ArrayList<MessageReference>(list);
}
}

View File

@ -31,7 +31,7 @@ import org.apache.activemq.command.ConsumerInfo;
public class DemandSubscription {
private ConsumerInfo remoteInfo;
private ConsumerInfo localInfo;
private Set remoteSubsIds = new CopyOnWriteArraySet();
private Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
private AtomicInteger dispatched = new AtomicInteger(0);
DemandSubscription(ConsumerInfo info) {

View File

@ -23,7 +23,10 @@ import org.apache.activemq.transport.Transport;
*
* @version $Revision: 1.1 $
*/
public class NetworkBridgeFactory {
public final class NetworkBridgeFactory {
private NetworkBridgeFactory() {
}
/**
* Create a network bridge

View File

@ -25,14 +25,9 @@ import org.apache.activemq.command.Command;
*
* @version $Revision$
*/
public class CommandIdComparator implements Comparator {
public class CommandIdComparator implements Comparator<Command> {
public int compare(Object o1, Object o2) {
assert o1 instanceof Command;
assert o2 instanceof Command;
Command c1 = (Command) o1;
Command c2 = (Command) o2;
public int compare(Command c1, Command c2) {
return c1.getCommandId() - c2.getCommandId();
}

View File

@ -35,7 +35,7 @@ public class OpenWireFormatFactory implements WireFormatFactory {
private boolean tcpNoDelayEnabled = true;
private boolean cacheEnabled = true;
private boolean tightEncodingEnabled = true;
private boolean sizePrefixDisabled = false;
private boolean sizePrefixDisabled;
private long maxInactivityDuration = 30 * 1000;
private int cacheSize = 1024;

View File

@ -282,14 +282,16 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
@SuppressWarnings("deprecation")
protected String tightUnmarshalString(DataInput dataIn, BooleanStream bs) throws IOException {
if (bs.readBoolean()) {
if (bs.readBoolean()) {
int size = dataIn.readShort();
byte data[] = new byte[size];
dataIn.readFully(data);
return new String(data, 0); // Yes deprecated, but we know what
// we are doing.
// Yes deprecated, but we know what we are doing.
// This allows us to create a String from a ASCII byte array. (no UTF-8 decoding)
return new String(data, 0);
} else {
return dataIn.readUTF();
}
@ -305,7 +307,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
int strlen = value.length();
int utflen = 0;
char[] charr = new char[strlen];
int c, count = 0;
int c = 0;
boolean isOnlyAscii = true;
value.getChars(0, strlen, charr, 0);

View File

@ -28,7 +28,7 @@ import org.apache.activemq.openwire.OpenWireFormat;
*
* @version $Revision$
*/
public class MarshallerFactory {
public final class MarshallerFactory {
/**
* Creates a Map of command type -> Marshallers
@ -92,6 +92,9 @@ public class MarshallerFactory {
}
private MarshallerFactory() {
}
private static void add(DataStreamMarshaller dsm) {
MARSHALLER[dsm.getDataStructureType()] = dsm;
}

View File

@ -282,14 +282,16 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
@SuppressWarnings("deprecation")
protected String tightUnmarshalString(DataInput dataIn, BooleanStream bs) throws IOException {
if (bs.readBoolean()) {
if (bs.readBoolean()) {
int size = dataIn.readShort();
byte data[] = new byte[size];
dataIn.readFully(data);
return new String(data, 0); // Yes deprecated, but we know what
// we are doing.
// Yes deprecated, but we know what we are doing.
// This allows us to create a String from a ASCII byte array. (no UTF-8 decoding)
return new String(data, 0);
} else {
return dataIn.readUTF();
}
@ -305,7 +307,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
int strlen = value.length();
int utflen = 0;
char[] charr = new char[strlen];
int c, count = 0;
int c = 0;
boolean isOnlyAscii = true;
value.getChars(0, strlen, charr, 0);

View File

@ -31,7 +31,7 @@ import org.apache.activemq.openwire.OpenWireFormat;
*
* @version $Revision$
*/
public class MarshallerFactory {
public final class MarshallerFactory {
/**
* Creates a Map of command type -> Marshallers
@ -96,6 +96,9 @@ public class MarshallerFactory {
}
private MarshallerFactory() {
}
private static void add(DataStreamMarshaller dsm) {
MARSHALLER[dsm.getDataStructureType()] = dsm;
}

View File

@ -35,7 +35,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
static {
Constructor constructor = null;
try {
constructor = StackTraceElement.class.getConstructor(new Class[] {String.class, String.class, String.class, int.class});
constructor = StackTraceElement.class.getConstructor(new Class[] {String.class, String.class,
String.class, int.class});
} catch (Throwable e) {
}
STACK_TRACE_ELEMENT_CONSTRUCTOR = constructor;
@ -49,10 +50,12 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
return 0;
}
public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutput dataOut, BooleanStream bs) throws IOException {
public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutput dataOut, BooleanStream bs)
throws IOException {
}
public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs) throws IOException {
public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs)
throws IOException {
}
public int tightMarshalLong1(OpenWireFormat wireFormat, long o, BooleanStream bs) throws IOException {
@ -75,7 +78,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
public void tightMarshalLong2(OpenWireFormat wireFormat, long o, DataOutput dataOut, BooleanStream bs) throws IOException {
public void tightMarshalLong2(OpenWireFormat wireFormat, long o, DataOutput dataOut, BooleanStream bs)
throws IOException {
if (bs.readBoolean()) {
if (bs.readBoolean()) {
dataOut.writeLong(o);
@ -89,7 +93,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
public long tightUnmarshalLong(OpenWireFormat wireFormat, DataInput dataIn, BooleanStream bs) throws IOException {
public long tightUnmarshalLong(OpenWireFormat wireFormat, DataInput dataIn, BooleanStream bs)
throws IOException {
if (bs.readBoolean()) {
if (bs.readBoolean()) {
return dataIn.readLong();
@ -117,19 +122,23 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
return answer & 0xffffffffL;
}
protected DataStructure tightUnmarsalNestedObject(OpenWireFormat wireFormat, DataInput dataIn, BooleanStream bs) throws IOException {
protected DataStructure tightUnmarsalNestedObject(OpenWireFormat wireFormat, DataInput dataIn,
BooleanStream bs) throws IOException {
return wireFormat.tightUnmarshalNestedObject(dataIn, bs);
}
protected int tightMarshalNestedObject1(OpenWireFormat wireFormat, DataStructure o, BooleanStream bs) throws IOException {
protected int tightMarshalNestedObject1(OpenWireFormat wireFormat, DataStructure o, BooleanStream bs)
throws IOException {
return wireFormat.tightMarshalNestedObject1(o, bs);
}
protected void tightMarshalNestedObject2(OpenWireFormat wireFormat, DataStructure o, DataOutput dataOut, BooleanStream bs) throws IOException {
protected void tightMarshalNestedObject2(OpenWireFormat wireFormat, DataStructure o, DataOutput dataOut,
BooleanStream bs) throws IOException {
wireFormat.tightMarshalNestedObject2(o, dataOut, bs);
}
protected DataStructure tightUnmarsalCachedObject(OpenWireFormat wireFormat, DataInput dataIn, BooleanStream bs) throws IOException {
protected DataStructure tightUnmarsalCachedObject(OpenWireFormat wireFormat, DataInput dataIn,
BooleanStream bs) throws IOException {
if (wireFormat.isCacheEnabled()) {
if (bs.readBoolean()) {
short index = dataIn.readShort();
@ -145,7 +154,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
protected int tightMarshalCachedObject1(OpenWireFormat wireFormat, DataStructure o, BooleanStream bs) throws IOException {
protected int tightMarshalCachedObject1(OpenWireFormat wireFormat, DataStructure o, BooleanStream bs)
throws IOException {
if (wireFormat.isCacheEnabled()) {
Short index = wireFormat.getMarshallCacheIndex(o);
bs.writeBoolean(index == null);
@ -161,7 +171,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
protected void tightMarshalCachedObject2(OpenWireFormat wireFormat, DataStructure o, DataOutput dataOut, BooleanStream bs) throws IOException {
protected void tightMarshalCachedObject2(OpenWireFormat wireFormat, DataStructure o, DataOutput dataOut,
BooleanStream bs) throws IOException {
if (wireFormat.isCacheEnabled()) {
Short index = wireFormat.getMarshallCacheIndex(o);
if (bs.readBoolean()) {
@ -175,7 +186,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
protected Throwable tightUnmarsalThrowable(OpenWireFormat wireFormat, DataInput dataIn, BooleanStream bs) throws IOException {
protected Throwable tightUnmarsalThrowable(OpenWireFormat wireFormat, DataInput dataIn, BooleanStream bs)
throws IOException {
if (bs.readBoolean()) {
String clazz = tightUnmarshalString(dataIn, bs);
String message = tightUnmarshalString(dataIn, bs);
@ -185,10 +197,11 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
StackTraceElement ss[] = new StackTraceElement[dataIn.readShort()];
for (int i = 0; i < ss.length; i++) {
try {
ss[i] = (StackTraceElement)STACK_TRACE_ELEMENT_CONSTRUCTOR.newInstance(new Object[] {tightUnmarshalString(dataIn, bs),
tightUnmarshalString(dataIn, bs),
tightUnmarshalString(dataIn, bs),
Integer.valueOf(dataIn.readInt())});
ss[i] = (StackTraceElement)STACK_TRACE_ELEMENT_CONSTRUCTOR
.newInstance(new Object[] {tightUnmarshalString(dataIn, bs),
tightUnmarshalString(dataIn, bs),
tightUnmarshalString(dataIn, bs),
Integer.valueOf(dataIn.readInt())});
} catch (IOException e) {
throw e;
} catch (Throwable e) {
@ -223,7 +236,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
protected int tightMarshalThrowable1(OpenWireFormat wireFormat, Throwable o, BooleanStream bs) throws IOException {
protected int tightMarshalThrowable1(OpenWireFormat wireFormat, Throwable o, BooleanStream bs)
throws IOException {
if (o == null) {
bs.writeBoolean(false);
return 0;
@ -248,7 +262,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
protected void tightMarshalThrowable2(OpenWireFormat wireFormat, Throwable o, DataOutput dataOut, BooleanStream bs) throws IOException {
protected void tightMarshalThrowable2(OpenWireFormat wireFormat, Throwable o, DataOutput dataOut,
BooleanStream bs) throws IOException {
if (bs.readBoolean()) {
tightMarshalString2(o.getClass().getName(), dataOut, bs);
tightMarshalString2(o.getMessage(), dataOut, bs);
@ -267,14 +282,16 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
@SuppressWarnings("deprecation")
protected String tightUnmarshalString(DataInput dataIn, BooleanStream bs) throws IOException {
if (bs.readBoolean()) {
if (bs.readBoolean()) {
int size = dataIn.readShort();
byte data[] = new byte[size];
dataIn.readFully(data);
return new String(data, 0); // Yes deprecated, but we know what
// we are doing.
// Yes deprecated, but we know what we are doing.
// This allows us to create a String from a ASCII byte array. (no UTF-8 decoding)
return new String(data, 0);
} else {
return dataIn.readUTF();
}
@ -290,7 +307,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
int strlen = value.length();
int utflen = 0;
char[] charr = new char[strlen];
int c, count = 0;
int c = 0;
boolean isOnlyAscii = true;
value.getChars(0, strlen, charr, 0);
@ -331,7 +348,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
protected int tightMarshalObjectArray1(OpenWireFormat wireFormat, DataStructure[] objects, BooleanStream bs) throws IOException {
protected int tightMarshalObjectArray1(OpenWireFormat wireFormat, DataStructure[] objects,
BooleanStream bs) throws IOException {
if (objects != null) {
int rc = 0;
bs.writeBoolean(true);
@ -346,7 +364,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
protected void tightMarshalObjectArray2(OpenWireFormat wireFormat, DataStructure[] objects, DataOutput dataOut, BooleanStream bs) throws IOException {
protected void tightMarshalObjectArray2(OpenWireFormat wireFormat, DataStructure[] objects,
DataOutput dataOut, BooleanStream bs) throws IOException {
if (bs.readBoolean()) {
dataOut.writeShort(objects.length);
for (int i = 0; i < objects.length; i++) {
@ -359,11 +378,13 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
return i;
}
protected void tightMarshalConstByteArray2(byte[] data, DataOutput dataOut, BooleanStream bs, int i) throws IOException {
protected void tightMarshalConstByteArray2(byte[] data, DataOutput dataOut, BooleanStream bs, int i)
throws IOException {
dataOut.write(data, 0, i);
}
protected byte[] tightUnmarshalConstByteArray(DataInput dataIn, BooleanStream bs, int i) throws IOException {
protected byte[] tightUnmarshalConstByteArray(DataInput dataIn, BooleanStream bs, int i)
throws IOException {
byte data[] = new byte[i];
dataIn.readFully(data);
return data;
@ -378,7 +399,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
protected void tightMarshalByteArray2(byte[] data, DataOutput dataOut, BooleanStream bs) throws IOException {
protected void tightMarshalByteArray2(byte[] data, DataOutput dataOut, BooleanStream bs)
throws IOException {
if (bs.readBoolean()) {
dataOut.writeInt(data.length);
dataOut.write(data);
@ -404,7 +426,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
protected void tightMarshalByteSequence2(ByteSequence data, DataOutput dataOut, BooleanStream bs) throws IOException {
protected void tightMarshalByteSequence2(ByteSequence data, DataOutput dataOut, BooleanStream bs)
throws IOException {
if (bs.readBoolean()) {
dataOut.writeInt(data.getLength());
dataOut.write(data.getData(), data.getOffset(), data.getLength());
@ -440,15 +463,18 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
return dataIn.readLong();
}
protected DataStructure looseUnmarsalNestedObject(OpenWireFormat wireFormat, DataInput dataIn) throws IOException {
protected DataStructure looseUnmarsalNestedObject(OpenWireFormat wireFormat, DataInput dataIn)
throws IOException {
return wireFormat.looseUnmarshalNestedObject(dataIn);
}
protected void looseMarshalNestedObject(OpenWireFormat wireFormat, DataStructure o, DataOutput dataOut) throws IOException {
protected void looseMarshalNestedObject(OpenWireFormat wireFormat, DataStructure o, DataOutput dataOut)
throws IOException {
wireFormat.looseMarshalNestedObject(o, dataOut);
}
protected DataStructure looseUnmarsalCachedObject(OpenWireFormat wireFormat, DataInput dataIn) throws IOException {
protected DataStructure looseUnmarsalCachedObject(OpenWireFormat wireFormat, DataInput dataIn)
throws IOException {
if (wireFormat.isCacheEnabled()) {
if (dataIn.readBoolean()) {
short index = dataIn.readShort();
@ -464,7 +490,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
protected void looseMarshalCachedObject(OpenWireFormat wireFormat, DataStructure o, DataOutput dataOut) throws IOException {
protected void looseMarshalCachedObject(OpenWireFormat wireFormat, DataStructure o, DataOutput dataOut)
throws IOException {
if (wireFormat.isCacheEnabled()) {
Short index = wireFormat.getMarshallCacheIndex(o);
dataOut.writeBoolean(index == null);
@ -480,7 +507,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
protected Throwable looseUnmarsalThrowable(OpenWireFormat wireFormat, DataInput dataIn) throws IOException {
protected Throwable looseUnmarsalThrowable(OpenWireFormat wireFormat, DataInput dataIn)
throws IOException {
if (dataIn.readBoolean()) {
String clazz = looseUnmarshalString(dataIn);
String message = looseUnmarshalString(dataIn);
@ -490,8 +518,11 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
StackTraceElement ss[] = new StackTraceElement[dataIn.readShort()];
for (int i = 0; i < ss.length; i++) {
try {
ss[i] = (StackTraceElement)STACK_TRACE_ELEMENT_CONSTRUCTOR.newInstance(new Object[] {looseUnmarshalString(dataIn), looseUnmarshalString(dataIn),
looseUnmarshalString(dataIn), Integer.valueOf(dataIn.readInt())});
ss[i] = (StackTraceElement)STACK_TRACE_ELEMENT_CONSTRUCTOR
.newInstance(new Object[] {looseUnmarshalString(dataIn),
looseUnmarshalString(dataIn),
looseUnmarshalString(dataIn),
Integer.valueOf(dataIn.readInt())});
} catch (IOException e) {
throw e;
} catch (Throwable e) {
@ -516,7 +547,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
protected void looseMarshalThrowable(OpenWireFormat wireFormat, Throwable o, DataOutput dataOut) throws IOException {
protected void looseMarshalThrowable(OpenWireFormat wireFormat, Throwable o, DataOutput dataOut)
throws IOException {
dataOut.writeBoolean(o != null);
if (o != null) {
looseMarshalString(o.getClass().getName(), dataOut);
@ -551,7 +583,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
protected void looseMarshalObjectArray(OpenWireFormat wireFormat, DataStructure[] objects, DataOutput dataOut) throws IOException {
protected void looseMarshalObjectArray(OpenWireFormat wireFormat, DataStructure[] objects,
DataOutput dataOut) throws IOException {
dataOut.writeBoolean(objects != null);
if (objects != null) {
dataOut.writeShort(objects.length);
@ -561,7 +594,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
}
}
protected void looseMarshalConstByteArray(OpenWireFormat wireFormat, byte[] data, DataOutput dataOut, int i) throws IOException {
protected void looseMarshalConstByteArray(OpenWireFormat wireFormat, byte[] data, DataOutput dataOut,
int i) throws IOException {
dataOut.write(data, 0, i);
}
@ -571,7 +605,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
return data;
}
protected void looseMarshalByteArray(OpenWireFormat wireFormat, byte[] data, DataOutput dataOut) throws IOException {
protected void looseMarshalByteArray(OpenWireFormat wireFormat, byte[] data, DataOutput dataOut)
throws IOException {
dataOut.writeBoolean(data != null);
if (data != null) {
dataOut.writeInt(data.length);
@ -589,7 +624,8 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller {
return rc;
}
protected void looseMarshalByteSequence(OpenWireFormat wireFormat, ByteSequence data, DataOutput dataOut) throws IOException {
protected void looseMarshalByteSequence(OpenWireFormat wireFormat, ByteSequence data, DataOutput dataOut)
throws IOException {
dataOut.writeBoolean(data != null);
if (data != null) {
dataOut.writeInt(data.getLength());

View File

@ -31,7 +31,7 @@ import org.apache.activemq.openwire.OpenWireFormat;
*
* @version $Revision$
*/
public class MarshallerFactory {
public final class MarshallerFactory {
/**
* Creates a Map of command type -> Marshallers
@ -98,6 +98,9 @@ public class MarshallerFactory {
}
private MarshallerFactory() {
}
private static void add(DataStreamMarshaller dsm) {
MARSHALLER[dsm.getDataStructureType()] = dsm;
}

View File

@ -43,7 +43,7 @@ public class ConnectionPool {
private TransactionManager transactionManager;
private ActiveMQConnection connection;
private Map cache;
private Map<SessionKey, SessionPool> cache;
private AtomicBoolean started = new AtomicBoolean(false);
private int referenceCount;
private ObjectPoolFactory poolFactory;
@ -54,7 +54,7 @@ public class ConnectionPool {
public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory,
TransactionManager transactionManager) {
this(connection, new HashMap(), poolFactory, transactionManager);
this(connection, new HashMap<SessionKey, SessionPool>(), poolFactory, transactionManager);
// Add a transport Listener so that we can notice if this connection
// should be expired due to
// a connection failure.
@ -76,7 +76,7 @@ public class ConnectionPool {
});
}
public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory,
public ConnectionPool(ActiveMQConnection connection, Map<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory,
TransactionManager transactionManager) {
this.connection = connection;
this.cache = cache;
@ -102,7 +102,7 @@ public class ConnectionPool {
ackMode = Session.SESSION_TRANSACTED;
}
SessionKey key = new SessionKey(transacted, ackMode);
SessionPool pool = (SessionPool)cache.get(key);
SessionPool pool = cache.get(key);
if (pool == null) {
pool = new SessionPool(this, key, poolFactory.createPool());
cache.put(key, pool);
@ -129,9 +129,9 @@ public class ConnectionPool {
public synchronized void close() {
if (connection != null) {
try {
Iterator i = cache.values().iterator();
Iterator<SessionPool> i = cache.values().iterator();
while (i.hasNext()) {
SessionPool pool = (SessionPool)i.next();
SessionPool pool = i.next();
i.remove();
try {
pool.close();

View File

@ -47,7 +47,7 @@ import org.apache.commons.pool.impl.GenericObjectPoolFactory;
*/
public class PooledConnectionFactory implements ConnectionFactory, Service {
private ConnectionFactory connectionFactory;
private Map cache = new HashMap();
private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
private ObjectPoolFactory poolFactory;
private int maximumActive = 500;
private int maxConnections = 1;
@ -87,16 +87,16 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
public synchronized Connection createConnection(String userName, String password) throws JMSException {
ConnectionKey key = new ConnectionKey(userName, password);
LinkedList pools = (LinkedList)cache.get(key);
LinkedList<ConnectionPool> pools = cache.get(key);
if (pools == null) {
pools = new LinkedList();
pools = new LinkedList<ConnectionPool>();
cache.put(key, pools);
}
ConnectionPool connection = null;
if (pools.size() == maxConnections) {
connection = (ConnectionPool)pools.removeFirst();
connection = pools.removeFirst();
}
// Now.. we might get a connection, but it might be that we need to
@ -138,8 +138,8 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
}
public void stop() throws Exception {
for (Iterator iter = cache.values().iterator(); iter.hasNext();) {
LinkedList list = (LinkedList)iter.next();
for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
LinkedList list = iter.next();
for (Iterator i = list.iterator(); i.hasNext();) {
ConnectionPool connection = (ConnectionPool)i.next();
connection.close();

View File

@ -63,10 +63,10 @@ public class PooledSession implements TopicSession, QueueSession {
private ActiveMQQueueSender queueSender;
private ActiveMQTopicPublisher topicPublisher;
private boolean transactional = true;
private boolean ignoreClose = false;
private boolean ignoreClose;
private final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
private final CopyOnWriteArrayList browsers = new CopyOnWriteArrayList();
private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
this.session = aSession;
@ -90,14 +90,14 @@ public class PooledSession implements TopicSession, QueueSession {
getSession().setMessageListener(null);
// Close any consumers and browsers that may have been created.
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
MessageConsumer consumer = (MessageConsumer)iter.next();
for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
MessageConsumer consumer = iter.next();
consumer.close();
}
consumers.clear();
for (Iterator iter = browsers.iterator(); iter.hasNext();) {
QueueBrowser browser = (QueueBrowser)iter.next();
for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
QueueBrowser browser = iter.next();
browser.close();
}
browsers.clear();

View File

@ -46,8 +46,7 @@ public class ProxyConnector implements Service {
private URI remote;
private URI localUri;
private String name;
CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
private CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>();
public void start() throws Exception {
@ -77,9 +76,9 @@ public class ProxyConnector implements Service {
if (this.server != null) {
ss.stop(this.server);
}
for (Iterator iter = connections.iterator(); iter.hasNext();) {
for (Iterator<ProxyConnection> iter = connections.iterator(); iter.hasNext();) {
LOG.info("Connector stopped: Stopping proxy.");
ss.stop((Service)iter.next());
ss.stop(iter.next());
}
ss.throwFirstException();
LOG.info("Proxy Connector " + getName() + " Stopped");

View File

@ -54,7 +54,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
// if(!((ActiveMQTempDestination)destination).getConnectionId().equals(context.getConnectionId().getValue())
// ) {
if (!securityContext.isBrokerContext()) {
Set allowedACLs = null;
Set<?> allowedACLs = null;
if (!destination.isTemporary()) {
allowedACLs = authorizationMap.getAdminACLs(destination);
} else {
@ -77,7 +77,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
if (securityContext == null) {
throw new SecurityException("User is not authenticated.");
}
Set allowedACLs = null;
Set<?> allowedACLs = null;
if (!destination.isTemporary()) {
allowedACLs = authorizationMap.getAdminACLs(destination);
} else {
@ -96,7 +96,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
if (subject == null) {
throw new SecurityException("User is not authenticated.");
}
Set allowedACLs = null;
Set<?> allowedACLs = null;
if (!info.getDestination().isTemporary()) {
allowedACLs = authorizationMap.getReadACLs(info.getDestination());
} else {
@ -140,7 +140,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
}
if (info.getDestination() != null) {
Set allowedACLs = null;
Set<?> allowedACLs = null;
if (!info.getDestination().isTemporary()) {
allowedACLs = authorizationMap.getWriteACLs(info.getDestination());
} else {
@ -162,7 +162,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
}
if (!subject.getAuthorizedWriteDests().contains(messageSend.getDestination())) {
Set allowedACLs = null;
Set<?> allowedACLs = null;
if (!messageSend.getDestination().isTemporary()) {
allowedACLs = authorizationMap.getWriteACLs(messageSend.getDestination());
} else {

View File

@ -35,13 +35,13 @@ import org.apache.activemq.filter.DestinationMapEntry;
*/
public class AuthorizationEntry extends DestinationMapEntry {
private Set readACLs = Collections.EMPTY_SET;
private Set writeACLs = Collections.EMPTY_SET;
private Set adminACLs = Collections.EMPTY_SET;
private Set<Object> readACLs = emptySet();
private Set<Object> writeACLs = emptySet();
private Set<Object> adminACLs = emptySet();
private String adminRoles = null;
private String readRoles = null;
private String writeRoles = null;
private String adminRoles;
private String readRoles;
private String writeRoles;
private String groupClass = "org.apache.activemq.jaas.GroupPrincipal";
@ -49,31 +49,36 @@ public class AuthorizationEntry extends DestinationMapEntry {
return groupClass;
}
@SuppressWarnings("unchecked")
private Set<Object> emptySet() {
return Collections.EMPTY_SET;
}
public void setGroupClass(String groupClass) {
this.groupClass = groupClass;
}
public Set getAdminACLs() {
public Set<Object> getAdminACLs() {
return adminACLs;
}
public void setAdminACLs(Set adminACLs) {
public void setAdminACLs(Set<Object> adminACLs) {
this.adminACLs = adminACLs;
}
public Set getReadACLs() {
public Set<Object> getReadACLs() {
return readACLs;
}
public void setReadACLs(Set readACLs) {
public void setReadACLs(Set<Object> readACLs) {
this.readACLs = readACLs;
}
public Set getWriteACLs() {
public Set<Object> getWriteACLs() {
return writeACLs;
}
public void setWriteACLs(Set writeACLs) {
public void setWriteACLs(Set<Object> writeACLs) {
this.writeACLs = writeACLs;
}
@ -100,8 +105,8 @@ public class AuthorizationEntry extends DestinationMapEntry {
setWriteACLs(parseACLs(writeRoles));
}
protected Set parseACLs(String roles) throws Exception {
Set answer = new HashSet();
protected Set<Object> parseACLs(String roles) throws Exception {
Set<Object> answer = new HashSet<Object>();
StringTokenizer iter = new StringTokenizer(roles, ",");
while (iter.hasMoreTokens()) {
String name = iter.nextToken().trim();

View File

@ -28,33 +28,33 @@ public interface AuthorizationMap {
/**
* Returns the set of all ACLs capable of administering temp destination
*/
Set getTempDestinationAdminACLs();
Set<?> getTempDestinationAdminACLs();
/**
* Returns the set of all ACLs capable of reading from temp destination
*/
Set getTempDestinationReadACLs();
Set<?> getTempDestinationReadACLs();
/**
* Returns the set of all ACLs capable of writing to temp destination
*/
Set getTempDestinationWriteACLs();
Set<?> getTempDestinationWriteACLs();
/**
* Returns the set of all ACLs capable of administering the given
* destination
*/
Set getAdminACLs(ActiveMQDestination destination);
Set<?> getAdminACLs(ActiveMQDestination destination);
/**
* Returns the set of all ACLs capable of reading (consuming from) the given
* destination
*/
Set getReadACLs(ActiveMQDestination destination);
Set<?> getReadACLs(ActiveMQDestination destination);
/**
* Returns the set of all ACLs capable of writing to the given destination
*/
Set getWriteACLs(ActiveMQDestination destination);
Set<?> getWriteACLs(ActiveMQDestination destination);
}

View File

@ -55,7 +55,7 @@ public class DefaultAuthorizationMap extends DestinationMap implements Authoriza
return this.tempDestinationAuthorizationEntry;
}
public Set getTempDestinationAdminACLs() {
public Set<Object> getTempDestinationAdminACLs() {
if (tempDestinationAuthorizationEntry != null) {
return tempDestinationAuthorizationEntry.getAdminACLs();
} else {
@ -63,7 +63,7 @@ public class DefaultAuthorizationMap extends DestinationMap implements Authoriza
}
}
public Set getTempDestinationReadACLs() {
public Set<Object> getTempDestinationReadACLs() {
if (tempDestinationAuthorizationEntry != null) {
return tempDestinationAuthorizationEntry.getReadACLs();
} else {
@ -71,7 +71,7 @@ public class DefaultAuthorizationMap extends DestinationMap implements Authoriza
}
}
public Set getTempDestinationWriteACLs() {
public Set<Object> getTempDestinationWriteACLs() {
if (tempDestinationAuthorizationEntry != null) {
return tempDestinationAuthorizationEntry.getWriteACLs();
} else {
@ -79,36 +79,36 @@ public class DefaultAuthorizationMap extends DestinationMap implements Authoriza
}
}
public Set getAdminACLs(ActiveMQDestination destination) {
Set entries = getAllEntries(destination);
Set answer = new HashSet();
public Set<Object> getAdminACLs(ActiveMQDestination destination) {
Set<AuthorizationEntry> entries = getAllEntries(destination);
Set<Object> answer = new HashSet<Object>();
// now lets go through each entry adding individual
for (Iterator iter = entries.iterator(); iter.hasNext();) {
AuthorizationEntry entry = (AuthorizationEntry)iter.next();
for (Iterator<AuthorizationEntry> iter = entries.iterator(); iter.hasNext();) {
AuthorizationEntry entry = iter.next();
answer.addAll(entry.getAdminACLs());
}
return answer;
}
public Set getReadACLs(ActiveMQDestination destination) {
Set entries = getAllEntries(destination);
Set answer = new HashSet();
public Set<Object> getReadACLs(ActiveMQDestination destination) {
Set<AuthorizationEntry> entries = getAllEntries(destination);
Set<Object> answer = new HashSet<Object>();
// now lets go through each entry adding individual
for (Iterator iter = entries.iterator(); iter.hasNext();) {
AuthorizationEntry entry = (AuthorizationEntry)iter.next();
for (Iterator<AuthorizationEntry> iter = entries.iterator(); iter.hasNext();) {
AuthorizationEntry entry = iter.next();
answer.addAll(entry.getReadACLs());
}
return answer;
}
public Set getWriteACLs(ActiveMQDestination destination) {
Set entries = getAllEntries(destination);
Set answer = new HashSet();
public Set<Object> getWriteACLs(ActiveMQDestination destination) {
Set<AuthorizationEntry> entries = getAllEntries(destination);
Set<Object> answer = new HashSet<Object>();
// now lets go through each entry adding individual
for (Iterator iter = entries.iterator(); iter.hasNext();) {
AuthorizationEntry entry = (AuthorizationEntry)iter.next();
for (Iterator<AuthorizationEntry> iter = entries.iterator(); iter.hasNext();) {
AuthorizationEntry entry = iter.next();
answer.addAll(entry.getWriteACLs());
}
return answer;
@ -139,12 +139,12 @@ public class DefaultAuthorizationMap extends DestinationMap implements Authoriza
this.defaultEntry = defaultEntry;
}
protected Class getEntryClass() {
protected Class<AuthorizationEntry> getEntryClass() {
return AuthorizationEntry.class;
}
protected Set getAllEntries(ActiveMQDestination destination) {
Set entries = get(destination);
protected Set<AuthorizationEntry> getAllEntries(ActiveMQDestination destination) {
Set<AuthorizationEntry> entries = get(destination);
if (defaultEntry != null) {
entries.add(defaultEntry);
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.security;
import java.security.Principal;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
@ -37,7 +38,7 @@ import org.apache.activemq.jaas.JassCredentialCallbackHandler;
public class JaasAuthenticationBroker extends BrokerFilter {
private final String jassConfiguration;
private final CopyOnWriteArrayList securityContexts = new CopyOnWriteArrayList();
private final CopyOnWriteArrayList<SecurityContext> securityContexts = new CopyOnWriteArrayList<SecurityContext>();
public JaasAuthenticationBroker(Broker next, String jassConfiguration) {
super(next);
@ -53,7 +54,7 @@ public class JaasAuthenticationBroker extends BrokerFilter {
this.subject = subject;
}
public Set getPrincipals() {
public Set<Principal> getPrincipals() {
return subject.getPrincipals();
}
@ -102,8 +103,8 @@ public class JaasAuthenticationBroker extends BrokerFilter {
* Refresh all the logged into users.
*/
public void refresh() {
for (Iterator iter = securityContexts.iterator(); iter.hasNext();) {
SecurityContext sc = (SecurityContext)iter.next();
for (Iterator<SecurityContext> iter = securityContexts.iterator(); iter.hasNext();) {
SecurityContext sc = iter.next();
sc.getAuthorizedReadDests().clear();
sc.getAuthorizedWriteDests().clear();
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.security;
import java.security.Principal;
import java.security.cert.X509Certificate;
import java.util.Set;
@ -38,7 +39,7 @@ public class JaasCertificateSecurityContext extends SecurityContext {
this.certs = certs;
}
public Set getPrincipals() {
public Set<Principal> getPrincipals() {
return subject.getPrincipals();
}

View File

@ -47,8 +47,6 @@ import org.apache.commons.logging.LogFactory;
*/
public class LDAPAuthorizationMap implements AuthorizationMap {
private static Log log = LogFactory.getLog(LDAPLoginModule.class);
public static final String INITIAL_CONTEXT_FACTORY = "initialContextFactory";
public static final String CONNECTION_URL = "connectionURL";
public static final String CONNECTION_USERNAME = "connectionUsername";
@ -68,6 +66,8 @@ public class LDAPAuthorizationMap implements AuthorizationMap {
public static final String WRITE_BASE = "writeBAse";
public static final String WRITE_ATTRIBUTE = "writeAttribute";
private static final Log LOG = LogFactory.getLog(LDAPLoginModule.class);
private String initialContextFactory;
private String connectionURL;
private String connectionUsername;
@ -135,31 +135,30 @@ public class LDAPAuthorizationMap implements AuthorizationMap {
queueSearchSubtreeBool = Boolean.valueOf(queueSearchSubtree).booleanValue();
}
public Set getTempDestinationAdminACLs() {
// TODO insert implementation
return null;
}
public Set getTempDestinationReadACLs() {
public Set<GroupPrincipal> getTempDestinationAdminACLs() {
// TODO insert implementation
return null;
}
public Set getTempDestinationWriteACLs() {
public Set<GroupPrincipal> getTempDestinationReadACLs() {
// TODO insert implementation
return null;
}
public Set getAdminACLs(ActiveMQDestination destination) {
public Set<GroupPrincipal> getTempDestinationWriteACLs() {
// TODO insert implementation
return null;
}
public Set<GroupPrincipal> getAdminACLs(ActiveMQDestination destination) {
return getACLs(destination, adminBase, adminAttribute);
}
public Set getReadACLs(ActiveMQDestination destination) {
public Set<GroupPrincipal> getReadACLs(ActiveMQDestination destination) {
return getACLs(destination, readBase, readAttribute);
}
public Set getWriteACLs(ActiveMQDestination destination) {
public Set<GroupPrincipal> getWriteACLs(ActiveMQDestination destination) {
return getACLs(destination, writeBase, writeAttribute);
}
@ -304,12 +303,12 @@ public class LDAPAuthorizationMap implements AuthorizationMap {
// Implementation methods
// -------------------------------------------------------------------------
protected Set getACLs(ActiveMQDestination destination, String roleBase, String roleAttribute) {
protected Set<GroupPrincipal> getACLs(ActiveMQDestination destination, String roleBase, String roleAttribute) {
try {
context = open();
} catch (NamingException e) {
log.error(e);
return new HashSet();
LOG.error(e);
return new HashSet<GroupPrincipal>();
}
// if ((destination.getDestinationType() &
@ -340,8 +339,8 @@ public class LDAPAuthorizationMap implements AuthorizationMap {
constraints.setReturningAttributes(new String[] {roleAttribute});
try {
Set roles = new HashSet();
Set acls = new HashSet();
Set<GroupPrincipal> roles = new HashSet<GroupPrincipal>();
Set<String> acls = new HashSet<String>();
NamingEnumeration results = context.search(destinationBase, roleBase, constraints);
while (results.hasMore()) {
SearchResult result = (SearchResult)results.next();
@ -351,23 +350,23 @@ public class LDAPAuthorizationMap implements AuthorizationMap {
}
acls = addAttributeValues(roleAttribute, attrs, acls);
}
for (Iterator iter = acls.iterator(); iter.hasNext();) {
String roleName = (String)iter.next();
for (Iterator<String> iter = acls.iterator(); iter.hasNext();) {
String roleName = iter.next();
roles.add(new GroupPrincipal(roleName));
}
return roles;
} catch (NamingException e) {
log.error(e);
return new HashSet();
LOG.error(e);
return new HashSet<GroupPrincipal>();
}
}
protected Set addAttributeValues(String attrId, Attributes attrs, Set values) throws NamingException {
protected Set<String> addAttributeValues(String attrId, Attributes attrs, Set<String> values) throws NamingException {
if (attrId == null || attrs == null) {
return values;
}
if (values == null) {
values = new HashSet();
values = new HashSet<String>();
}
Attribute attr = attrs.get(attrId);
if (attr == null) {
@ -387,7 +386,7 @@ public class LDAPAuthorizationMap implements AuthorizationMap {
}
try {
Hashtable env = new Hashtable();
Hashtable<String, String> env = new Hashtable<String, String>();
env.put(Context.INITIAL_CONTEXT_FACTORY, initialContextFactory);
if (connectionUsername != null || !"".equals(connectionUsername)) {
env.put(Context.SECURITY_PRINCIPAL, connectionUsername);
@ -401,7 +400,7 @@ public class LDAPAuthorizationMap implements AuthorizationMap {
context = new InitialDirContext(env);
} catch (NamingException e) {
log.error(e);
LOG.error(e);
throw e;
}
return context;

View File

@ -21,6 +21,8 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.command.ActiveMQDestination;
/**
* Used to cache up authorizations so that subsequent requests are faster.
*
@ -34,37 +36,38 @@ public abstract class SecurityContext {
return true;
}
public Set getPrincipals() {
@SuppressWarnings("unchecked")
public Set<?> getPrincipals() {
return Collections.EMPTY_SET;
}
};
final String userName;
final ConcurrentHashMap authorizedReadDests = new ConcurrentHashMap();
final ConcurrentHashMap authorizedWriteDests = new ConcurrentHashMap();
final ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> authorizedReadDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>();
final ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> authorizedWriteDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>();
public SecurityContext(String userName) {
this.userName = userName;
}
public boolean isInOneOf(Set allowedPrincipals) {
HashSet set = new HashSet(getPrincipals());
public boolean isInOneOf(Set<?> allowedPrincipals) {
HashSet<?> set = new HashSet<Object>(getPrincipals());
set.retainAll(allowedPrincipals);
return set.size() > 0;
}
public abstract Set getPrincipals();
public abstract Set<?> getPrincipals();
public String getUserName() {
return userName;
}
public ConcurrentHashMap getAuthorizedReadDests() {
public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedReadDests() {
return authorizedReadDests;
}
public ConcurrentHashMap getAuthorizedWriteDests() {
public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedWriteDests() {
return authorizedWriteDests;
}

View File

@ -54,7 +54,7 @@ public class SimpleAuthenticationBroker extends BrokerFilter {
final Set groups = (Set)userGroups.get(info.getUserName());
SecurityContext s = new SecurityContext(info.getUserName()) {
public Set getPrincipals() {
public Set<?> getPrincipals() {
return groups;
}
};

View File

@ -63,7 +63,7 @@ public class SimpleAuthorizationMap implements AuthorizationMap {
return this.tempDestinationAuthorizationEntry;
}
public Set getTempDestinationAdminACLs() {
public Set<Object> getTempDestinationAdminACLs() {
if (tempDestinationAuthorizationEntry != null) {
return tempDestinationAuthorizationEntry.getAdminACLs();
} else {
@ -71,7 +71,7 @@ public class SimpleAuthorizationMap implements AuthorizationMap {
}
}
public Set getTempDestinationReadACLs() {
public Set<Object> getTempDestinationReadACLs() {
if (tempDestinationAuthorizationEntry != null) {
return tempDestinationAuthorizationEntry.getReadACLs();
} else {
@ -79,7 +79,7 @@ public class SimpleAuthorizationMap implements AuthorizationMap {
}
}
public Set getTempDestinationWriteACLs() {
public Set<Object> getTempDestinationWriteACLs() {
if (tempDestinationAuthorizationEntry != null) {
return tempDestinationAuthorizationEntry.getWriteACLs();
} else {
@ -87,15 +87,18 @@ public class SimpleAuthorizationMap implements AuthorizationMap {
}
}
public Set getAdminACLs(ActiveMQDestination destination) {
@SuppressWarnings("unchecked")
public Set<Object> getAdminACLs(ActiveMQDestination destination) {
return adminACLs.get(destination);
}
public Set getReadACLs(ActiveMQDestination destination) {
@SuppressWarnings("unchecked")
public Set<Object> getReadACLs(ActiveMQDestination destination) {
return readACLs.get(destination);
}
public Set getWriteACLs(ActiveMQDestination destination) {
@SuppressWarnings("unchecked")
public Set<Object> getWriteACLs(ActiveMQDestination destination) {
return writeACLs.get(destination);
}

View File

@ -21,7 +21,6 @@ import org.apache.activemq.command.ProducerInfo;
public class ProducerState {
final ProducerInfo info;
private long lastSequenceId = -1;
public ProducerState(ProducerInfo info) {
this.info = info;

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
@ -45,7 +46,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
private static final Log LOG = LogFactory.getLog(AMQTopicMessageStore.class);
private TopicReferenceStore topicReferenceStore;
private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
private Map<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
public AMQTopicMessageStore(AMQPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
super(adapter, topicReferenceStore, destinationName);
@ -157,7 +158,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
@Override
protected Location doAsyncWrite() throws IOException {
final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
final Map<SubscriptionKey, MessageId> cpAckedLastAckLocations;
// swap out the hash maps..
synchronized (this) {
cpAckedLastAckLocations = this.ackedLastAckLocations;

View File

@ -19,6 +19,8 @@ package org.apache.activemq.store.amq;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -34,7 +36,7 @@ import org.apache.activemq.kaha.impl.async.Location;
public class AMQTx {
private final Location location;
private ArrayList<AMQTxOperation> operations = new ArrayList<AMQTxOperation>();
private List<AMQTxOperation> operations = new ArrayList<AMQTxOperation>();
public AMQTx(Location location) {
this.location = location;
@ -56,7 +58,7 @@ public class AMQTx {
}
public Message[] getMessages() {
ArrayList<Object> list = new ArrayList<Object>();
List<Object> list = new ArrayList<Object>();
for (Iterator<AMQTxOperation> iter = operations.iterator(); iter.hasNext();) {
AMQTxOperation op = iter.next();
if (op.getOperationType() == AMQTxOperation.ADD_OPERATION_TYPE) {
@ -69,7 +71,7 @@ public class AMQTx {
}
public MessageAck[] getAcks() {
ArrayList<Object> list = new ArrayList<Object>();
List<Object> list = new ArrayList<Object>();
for (Iterator<AMQTxOperation> iter = operations.iterator(); iter.hasNext();) {
AMQTxOperation op = iter.next();
if (op.getOperationType() == AMQTxOperation.REMOVE_OPERATION_TYPE) {
@ -88,11 +90,11 @@ public class AMQTx {
return this.location;
}
public ArrayList<AMQTxOperation> getOperations() {
public List<AMQTxOperation> getOperations() {
return operations;
}
public void setOperations(ArrayList<AMQTxOperation> operations) {
public void setOperations(List<AMQTxOperation> operations) {
this.operations = operations;
}
}

View File

@ -28,7 +28,7 @@ final class RecoveryListenerAdapter implements MessageRecoveryListener {
private static final Log LOG = LogFactory.getLog(RecoveryListenerAdapter.class);
private final MessageStore store;
private final MessageRecoveryListener listener;
private int count = 0;
private int count;
private MessageId lastRecovered;
RecoveryListenerAdapter(MessageStore store, MessageRecoveryListener listener) {

View File

@ -68,7 +68,7 @@ public interface JDBCAdapter {
long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException;
Set doGetDestinations(TransactionContext c) throws SQLException, IOException;
Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException;
void setUseExternalMessageReferences(boolean useExternalMessageReferences);

Some files were not shown because too many files have changed in this diff Show More