Use ConcurrentMap in declarations
This commit is contained in:
Timothy Bish 2015-04-28 11:15:08 -04:00
parent bcdf770c10
commit 9ef4259297
43 changed files with 312 additions and 193 deletions

View File

@ -23,6 +23,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.broker.Broker;
@ -70,14 +71,14 @@ public class AdvisoryBroker extends BrokerFilter {
private static final Logger LOG = LoggerFactory.getLogger(AdvisoryBroker.class);
private static final IdGenerator ID_GENERATOR = new IdGenerator();
protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
protected final ConcurrentMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>();
protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
protected final ConcurrentMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
protected final ConcurrentMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
protected final ConcurrentMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
protected final ProducerId advisoryProducerId = new ProducerId();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();

View File

@ -17,7 +17,7 @@
package org.apache.activemq.broker;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.region.MessageReference;
@ -34,8 +34,8 @@ import org.apache.activemq.transaction.Transaction;
/**
* Used to hold context information needed to process requests sent to a broker.
*
*
*
*
*/
public class ConnectionContext {
@ -44,7 +44,7 @@ public class ConnectionContext {
private Broker broker;
private boolean inRecoveryMode;
private Transaction transaction;
private ConcurrentHashMap<TransactionId, Transaction> transactions;
private ConcurrentMap<TransactionId, Transaction> transactions;
private SecurityContext securityContext;
private ConnectionId connectionId;
private String clientId;
@ -64,20 +64,20 @@ public class ConnectionContext {
private XATransactionId xid;
public ConnectionContext() {
this.messageEvaluationContext = new MessageEvaluationContext();
this.messageEvaluationContext = new MessageEvaluationContext();
}
public ConnectionContext(MessageEvaluationContext messageEvaluationContext) {
this.messageEvaluationContext=messageEvaluationContext;
this.messageEvaluationContext=messageEvaluationContext;
}
public ConnectionContext(ConnectionInfo info) {
this();
this();
setClientId(info.getClientId());
setUserName(info.getUserName());
setConnectionId(info.getConnectionId());
}
public ConnectionContext copy() {
ConnectionContext rc = new ConnectionContext(this.messageEvaluationContext);
rc.connection = this.connection;
@ -196,11 +196,11 @@ public class ConnectionContext {
this.inRecoveryMode = inRecoveryMode;
}
public ConcurrentHashMap<TransactionId, Transaction> getTransactions() {
public ConcurrentMap<TransactionId, Transaction> getTransactions() {
return transactions;
}
public void setTransactions(ConcurrentHashMap<TransactionId, Transaction> transactions) {
public void setTransactions(ConcurrentMap<TransactionId, Transaction> transactions) {
this.transactions = transactions;
}

View File

@ -21,8 +21,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@ -49,8 +51,8 @@ import org.slf4j.LoggerFactory;
public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final ConcurrentMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
private final ConcurrentMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final SubscriptionKey subscriptionKey;
private final boolean keepDurableSubsActive;
private final AtomicBoolean active = new AtomicBoolean();

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -69,7 +70,7 @@ public class Topic extends BaseDestination implements Task {
private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {

View File

@ -24,6 +24,7 @@ import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
@ -51,7 +52,7 @@ import org.slf4j.LoggerFactory;
*/
public class TopicRegion extends AbstractRegion {
private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class);
protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
protected final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
private boolean keepDurableSubsActive;

View File

@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -126,8 +127,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected ActiveMQDestination[] dynamicallyIncludedDestinations;
protected ActiveMQDestination[] staticallyIncludedDestinations;
protected ActiveMQDestination[] durableDestinations;
protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
protected final CountDownLatch startedLatch = new CountDownLatch(2);
protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
@ -1564,7 +1565,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return configuration.isDuplex() || createdByDuplex;
}
public ConcurrentHashMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
return subscriptionMapByRemoteId;
}

View File

@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
@ -50,7 +51,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class);
protected URI localURI;
protected ConnectionFilter connectionFilter;
protected ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
protected ConcurrentMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
protected ServiceSupport serviceSupport = new ServiceSupport() {

View File

@ -16,6 +16,21 @@
*/
package org.apache.activemq.plugin;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.management.JMException;
import javax.management.ObjectName;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
@ -29,19 +44,6 @@ import org.apache.activemq.command.ConsumerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.JMException;
import javax.management.ObjectName;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* A plugin which allows the caching of the selector from a subscription queue.
* <p/>
@ -50,7 +52,6 @@ import java.util.concurrent.ConcurrentHashMap;
* <p/>
* This is influenced by code snippets developed by Maciej Rakowicz
*
* @author Roelof Naude roelof(dot)naude(at)gmail.com
* @see https://issues.apache.org/activemq/browse/AMQ-3004
* @see http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E
*/
@ -62,7 +63,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
* The subscription's selector cache. We cache compiled expressions keyed
* by the target destination.
*/
private ConcurrentHashMap<String, Set<String>> subSelectorCache = new ConcurrentHashMap<String, Set<String>>();
private ConcurrentMap<String, Set<String>> subSelectorCache = new ConcurrentHashMap<String, Set<String>>();
private final File persistFile;
private boolean singleSelectorPerDestination = false;
@ -70,7 +71,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
private ObjectName objectName;
private boolean running = true;
private Thread persistThread;
private final Thread persistThread;
private long persistInterval = MAX_PERSIST_INTERVAL;
public static final long MAX_PERSIST_INTERVAL = 600000;
private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread";
@ -244,6 +245,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
while (running) {
try {

View File

@ -39,8 +39,8 @@ import org.apache.activemq.command.ProducerInfo;
/**
* Verifies if a authenticated user can do an operation against the broker using
* an authorization map.
*
*
*
*
*/
public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMBean {
@ -95,7 +95,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
}
return true;
}
@Override
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
final SecurityContext securityContext = checkSecurityContext(context);
@ -110,7 +110,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
final SecurityContext securityContext = checkSecurityContext(context);
if (!checkDestinationAdmin(securityContext, destination)) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to create: " + destination);
}
@ -205,7 +205,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
final SecurityContext securityContext = checkSecurityContext(producerExchange.getConnectionContext());
if (!securityContext.isBrokerContext() && !securityContext.getAuthorizedWriteDests().contains(messageSend.getDestination())) {
if (!securityContext.isBrokerContext() && !securityContext.getAuthorizedWriteDests().containsValue(messageSend.getDestination())) {
Set<?> allowedACLs = null;
if (!messageSend.getDestination().isTemporary()) {
@ -226,18 +226,22 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
// SecurityAdminMBean interface
// -------------------------------------------------------------------------
@Override
public void addQueueRole(String queue, String operation, String role) {
addDestinationRole(new ActiveMQQueue(queue), operation, role);
}
@Override
public void addTopicRole(String topic, String operation, String role) {
addDestinationRole(new ActiveMQTopic(topic), operation, role);
}
@Override
public void removeQueueRole(String queue, String operation, String role) {
removeDestinationRole(new ActiveMQQueue(queue), operation, role);
}
@Override
public void removeTopicRole(String topic, String operation, String role) {
removeDestinationRole(new ActiveMQTopic(topic), operation, role);
}
@ -248,15 +252,19 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
public void removeDestinationRole(javax.jms.Destination destination, String operation, String role) {
}
@Override
public void addRole(String role) {
}
@Override
public void addUserRole(String user, String role) {
}
@Override
public void removeRole(String role) {
}
@Override
public void removeUserRole(String user, String role) {
}

View File

@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.command.ActiveMQDestination;
@ -38,6 +39,7 @@ public abstract class SecurityContext {
return true;
}
@Override
public Set<Principal> getPrincipals() {
return Collections.emptySet();
}
@ -45,8 +47,8 @@ public abstract class SecurityContext {
final String userName;
final ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> authorizedReadDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>();
final ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> authorizedWriteDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>();
final ConcurrentMap<ActiveMQDestination, ActiveMQDestination> authorizedReadDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>();
final ConcurrentMap<ActiveMQDestination, ActiveMQDestination> authorizedWriteDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>();
public SecurityContext(String userName) {
this.userName = userName;
@ -72,11 +74,11 @@ public abstract class SecurityContext {
return userName;
}
public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedReadDests() {
public ConcurrentMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedReadDests() {
return authorizedReadDests;
}
public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedWriteDests() {
public ConcurrentMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedWriteDests() {
return authorizedWriteDests;
}

View File

@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
@ -46,8 +47,8 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
MemoryTransactionStore transactionStore;
ConcurrentHashMap<ActiveMQDestination, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>();
ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
ConcurrentMap<ActiveMQDestination, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>();
ConcurrentMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
private boolean useExternalMessageReferences;
@Override

View File

@ -16,6 +16,15 @@
*/
package org.apache.activemq.store.memory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -32,14 +41,6 @@ import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Provides a TransactionStore implementation that can create transaction aware
* MessageStore objects from non transaction aware MessageStore objects.
@ -48,7 +49,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class MemoryTransactionStore implements TransactionStore {
protected ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
protected ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
protected Map<TransactionId, Tx> preparedTransactions = Collections.synchronizedMap(new LinkedHashMap<TransactionId, Tx>());
protected final PersistenceAdapter persistenceAdapter;
@ -228,6 +229,7 @@ public class MemoryTransactionStore implements TransactionStore {
/**
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
@Override
public void prepare(TransactionId txid) throws IOException {
Tx tx = inflightTransactions.remove(txid);
if (tx == null) {
@ -254,6 +256,7 @@ public class MemoryTransactionStore implements TransactionStore {
return tx;
}
@Override
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
if (preCommit != null) {
preCommit.run();
@ -276,17 +279,21 @@ public class MemoryTransactionStore implements TransactionStore {
/**
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
@Override
public void rollback(TransactionId txid) throws IOException {
preparedTransactions.remove(txid);
inflightTransactions.remove(txid);
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
}
@Override
public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
// All the inflight transactions get rolled back..
inflightTransactions.clear();
@ -320,6 +327,7 @@ public class MemoryTransactionStore implements TransactionStore {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand() {
MessageStore messageStore = destination;
@Override
public Message getMessage() {
return message;
}
@ -329,6 +337,7 @@ public class MemoryTransactionStore implements TransactionStore {
return destination;
}
@Override
public void run(ConnectionContext ctx) throws IOException {
destination.addMessage(ctx, message);
}
@ -356,10 +365,12 @@ public class MemoryTransactionStore implements TransactionStore {
if (ack.isInTransaction()) {
Tx tx = getTx(ack.getTransactionId());
tx.add(new RemoveMessageCommand() {
@Override
public MessageAck getMessageAck() {
return ack;
}
@Override
public void run(ConnectionContext ctx) throws IOException {
destination.removeMessage(ctx, ack);
}
@ -383,10 +394,12 @@ public class MemoryTransactionStore implements TransactionStore {
if (ack.isInTransaction()) {
Tx tx = getTx(ack.getTransactionId());
tx.add(new RemoveMessageCommand() {
@Override
public MessageAck getMessageAck() {
return ack;
}
@Override
public void run(ConnectionContext ctx) throws IOException {
destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
}

View File

@ -22,6 +22,7 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.broker.BrokerFactoryHandler;
import org.apache.activemq.broker.BrokerService;
@ -37,16 +38,18 @@ import org.apache.activemq.util.URISupport;
public class PeerTransportFactory extends TransportFactory {
public static final ConcurrentHashMap BROKERS = new ConcurrentHashMap();
public static final ConcurrentHashMap CONNECTORS = new ConcurrentHashMap();
public static final ConcurrentHashMap SERVERS = new ConcurrentHashMap();
public static final ConcurrentMap BROKERS = new ConcurrentHashMap();
public static final ConcurrentMap CONNECTORS = new ConcurrentHashMap();
public static final ConcurrentMap SERVERS = new ConcurrentHashMap();
private static final IdGenerator ID_GENERATOR = new IdGenerator("peer-");
@Override
public Transport doConnect(URI location) throws Exception {
VMTransportFactory vmTransportFactory = createTransportFactory(location);
return vmTransportFactory.doConnect(location);
}
@Override
public Transport doCompositeConnect(URI location) throws Exception {
VMTransportFactory vmTransportFactory = createTransportFactory(location);
return vmTransportFactory.doCompositeConnect(location);
@ -78,15 +81,18 @@ public class PeerTransportFactory extends TransportFactory {
final String finalBroker = broker;
final String finalGroup = group;
VMTransportFactory rc = new VMTransportFactory() {
@Override
public Transport doConnect(URI ignore) throws Exception {
return super.doConnect(finalLocation);
};
@Override
public Transport doCompositeConnect(URI ignore) throws Exception {
return super.doCompositeConnect(finalLocation);
};
};
rc.setBrokerFactoryHandler(new BrokerFactoryHandler() {
@Override
public BrokerService createBroker(URI brokerURI) throws Exception {
BrokerService service = new BrokerService();
IntrospectionSupport.setProperties(service, brokerOptions);
@ -104,6 +110,7 @@ public class PeerTransportFactory extends TransportFactory {
}
}
@Override
public TransportServer doBind(URI location) throws IOException {
throw new IOException("This protocol does not support being bound.");
}

View File

@ -22,6 +22,7 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerFactoryHandler;
@ -43,9 +44,9 @@ import org.slf4j.MDC;
public class VMTransportFactory extends TransportFactory {
public static final ConcurrentHashMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>();
public static final ConcurrentHashMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>();
public static final ConcurrentHashMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>();
public static final ConcurrentMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>();
public static final ConcurrentMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>();
public static final ConcurrentMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>();
private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class);
BrokerFactoryHandler brokerFactoryHandler;

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.transport.LogWriter;
import org.apache.activemq.transport.TransportLoggerView;
@ -34,22 +35,22 @@ import org.slf4j.LoggerFactory;
* implementing LogWriter is specified by the files in the
* resources/META-INF/services/org/apache/activemq/transport/logwriters
* directory.
*
*
* @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com
*
*
*/
public class LogWriterFinder {
private static final Logger log = LoggerFactory.getLogger(TransportLoggerView.class);
private final String path;
private final ConcurrentHashMap classMap = new ConcurrentHashMap();
private final ConcurrentMap classMap = new ConcurrentHashMap();
/**
* Builds a LogWriterFinder that will look for the mappings between
* LogWriter names and classes in the directory "path".
* @param path The directory where the files that map log writer names to
* LogWriter classes are.
* LogWriter classes are.
*/
public LogWriterFinder(String path) {
this.path = path;
@ -75,7 +76,7 @@ public class LogWriterFinder {
}
return (LogWriter)clazz.newInstance();
}
/**
* Loads and returns a class given a Properties object with a "class" property.
* @param properties a Properties object with a "class" property.

View File

@ -16,16 +16,16 @@
*/
package org.apache.activemq.util.osgi;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.net.URL;
import org.apache.activemq.Service;
import org.apache.activemq.store.PersistenceAdapter;
@ -33,14 +33,13 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.FactoryFinder.ObjectFactory;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleEvent;
import org.osgi.framework.SynchronousBundleListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An OSGi bundle activator for ActiveMQ which adapts the {@link org.apache.activemq.util.FactoryFinder}
@ -51,7 +50,7 @@ public class Activator implements BundleActivator, SynchronousBundleListener, Ob
private static final Logger LOG = LoggerFactory.getLogger(Activator.class);
private final ConcurrentHashMap<String, Class> serviceCache = new ConcurrentHashMap<String, Class>();
private final ConcurrentMap<String, Class> serviceCache = new ConcurrentHashMap<String, Class>();
private final ConcurrentMap<Long, BundleWrapper> bundleWrappers = new ConcurrentHashMap<Long, BundleWrapper>();
private BundleContext bundleContext;
@ -59,6 +58,7 @@ public class Activator implements BundleActivator, SynchronousBundleListener, Ob
// BundleActivator interface impl
// ================================================================
@Override
public synchronized void start(BundleContext bundleContext) throws Exception {
// This is how we replace the default FactoryFinder strategy
@ -79,6 +79,7 @@ public class Activator implements BundleActivator, SynchronousBundleListener, Ob
}
@Override
public synchronized void stop(BundleContext bundleContext) throws Exception {
debug("deactivating");
bundleContext.removeBundleListener(this);
@ -93,6 +94,7 @@ public class Activator implements BundleActivator, SynchronousBundleListener, Ob
// SynchronousBundleListener interface impl
// ================================================================
@Override
public void bundleChanged(BundleEvent event) {
if (event.getType() == BundleEvent.RESOLVED) {
register(event.getBundle());
@ -133,6 +135,7 @@ public class Activator implements BundleActivator, SynchronousBundleListener, Ob
// ObjectFactory interface impl
// ================================================================
@Override
public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException {
Class clazz = serviceCache.get(path);
if (clazz == null) {

View File

@ -22,14 +22,20 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@ -500,7 +506,7 @@ public abstract class PListTestSupport {
abstract protected PListStore createConcurrentAddIteratePListStore();
enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE, ITERATE_REMOVE}
ConcurrentHashMap<String, Object> entries = new ConcurrentHashMap<String, Object>();
ConcurrentMap<String, Object> entries = new ConcurrentHashMap<String, Object>();
class Job implements Runnable {

View File

@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@ -114,7 +115,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
public final ConcurrentMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
protected boolean dispatchAsync=true;
protected boolean alwaysSessionAsync = true;
@ -172,8 +173,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
// Maps ConsumerIds to ActiveMQConsumer objects
private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
private final ConcurrentMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
private final ConcurrentMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
private final SessionId connectionSessionId;
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
@ -2100,7 +2101,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
return false;
}
return !activeTempDestinations.contains(dest);
return !activeTempDestinations.containsValue(dest);
}
public boolean isCopyMessageOnSend() {
@ -2486,10 +2487,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
return;
}
Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
Iterator<ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
= this.activeTempDestinations.entrySet().iterator();
while(entries.hasNext()) {
ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
try {
// Only delete this temp destination if it was created from this connection. The connection used
// for the advisory consumer may also have a reference to this temp destination.

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.ActiveMQDestination;
@ -40,8 +41,8 @@ import org.apache.activemq.command.TransactionId;
public class ConnectionState {
ConnectionInfo info;
private final ConcurrentHashMap<TransactionId, TransactionState> transactions = new ConcurrentHashMap<TransactionId, TransactionState>();
private final ConcurrentHashMap<SessionId, SessionState> sessions = new ConcurrentHashMap<SessionId, SessionState>();
private final ConcurrentMap<TransactionId, TransactionState> transactions = new ConcurrentHashMap<TransactionId, TransactionState>();
private final ConcurrentMap<SessionId, SessionState> sessions = new ConcurrentHashMap<SessionId, SessionState>();
private final List<DestinationInfo> tempDestinations = Collections.synchronizedList(new ArrayList<DestinationInfo>());
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private boolean connectionInterruptProcessingComplete = true;
@ -53,6 +54,7 @@ public class ConnectionState {
addSession(new SessionInfo(info, -1));
}
@Override
public String toString() {
return info.toString();
}
@ -152,7 +154,7 @@ public class ConnectionState {
public void setConnectionInterruptProcessingComplete(boolean connectionInterruptProcessingComplete) {
this.connectionInterruptProcessingComplete = connectionInterruptProcessingComplete;
}
public boolean isConnectionInterruptProcessingComplete() {
return connectionInterruptProcessingComplete;
}

View File

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.jms.TransactionRolledBackException;
import javax.transaction.xa.XAResource;
@ -60,7 +61,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
private static final int MESSAGE_PULL_SIZE = 400;
protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
protected final ConcurrentMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
private boolean trackTransactions;
private boolean restoreSessions = true;

View File

@ -16,13 +16,6 @@
*/
package org.apache.activemq.transport;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
@ -31,13 +24,21 @@ import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
public abstract class TransportFactory {
private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
private static final ConcurrentMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
private static final String THREAD_NAME_FILTER = "threadName";
@ -191,7 +192,7 @@ public abstract class TransportFactory {
}
protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
String wireFormat = (String)options.remove("wireFormat");
String wireFormat = options.remove("wireFormat");
if (wireFormat == null) {
wireFormat = getDefaultWireFormatType();
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.discovery;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
@ -26,7 +27,7 @@ import org.apache.activemq.util.IOExceptionSupport;
public abstract class DiscoveryAgentFactory {
private static final FactoryFinder DISCOVERY_AGENT_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/discoveryagent/");
private static final ConcurrentHashMap<String, DiscoveryAgentFactory> DISCOVERY_AGENT_FACTORYS = new ConcurrentHashMap<String, DiscoveryAgentFactory>();
private static final ConcurrentMap<String, DiscoveryAgentFactory> DISCOVERY_AGENT_FACTORYS = new ConcurrentHashMap<String, DiscoveryAgentFactory>();
/**
* @param uri

View File

@ -20,6 +20,8 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.TransportFilter;
@ -32,8 +34,8 @@ import org.slf4j.LoggerFactory;
/**
* A {@link ReliableTransportChannel} which uses a {@link DiscoveryAgent} to
* discover remote broker instances and dynamically connect to them.
*
*
*
*
*/
public class DiscoveryTransport extends TransportFilter implements DiscoveryListener {
@ -41,7 +43,7 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
private final CompositeTransport next;
private DiscoveryAgent discoveryAgent;
private final ConcurrentHashMap<String, URI> serviceURIs = new ConcurrentHashMap<String, URI>();
private final ConcurrentMap<String, URI> serviceURIs = new ConcurrentHashMap<String, URI>();
private Map<String, String> parameters;
@ -70,6 +72,7 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
ss.throwFirstException();
}
@Override
public void onServiceAdd(DiscoveryEvent event) {
String url = event.getServiceName();
if (url != null) {
@ -85,6 +88,7 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
}
}
@Override
public void onServiceRemove(DiscoveryEvent event) {
URI uri = serviceURIs.get(event.getServiceName());
if (uri != null) {
@ -101,7 +105,7 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
}
public void setParameters(Map<String, String> parameters) {
this.parameters = parameters;
this.parameters = parameters;
}
@Override

View File

@ -22,6 +22,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.Command;
@ -48,8 +49,8 @@ import org.slf4j.LoggerFactory;
/**
* A Transport that fans out a connection to multiple brokers.
*
*
*
*
*/
public class FanoutTransport implements CompositeTransport {
@ -61,7 +62,7 @@ public class FanoutTransport implements CompositeTransport {
private final Object reconnectMutex = new Object();
private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>();
private final ConcurrentMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>();
private final TaskRunnerFactory reconnectTaskFactory;
private final TaskRunner reconnectTask;
@ -161,6 +162,7 @@ public class FanoutTransport implements CompositeTransport {
reconnectTaskFactory = new TaskRunnerFactory();
reconnectTaskFactory.init();
reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
@Override
public boolean iterate() {
return doConnect();
}
@ -230,7 +232,7 @@ public class FanoutTransport implements CompositeTransport {
ServiceSupport.dispose(fanoutHandler.transport);
fanoutHandler.transport=null;
}
if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
connectionFailure = e;
@ -276,6 +278,7 @@ public class FanoutTransport implements CompositeTransport {
return true;
}
@Override
public void start() throws Exception {
synchronized (reconnectMutex) {
LOG.debug("Started.");
@ -293,6 +296,7 @@ public class FanoutTransport implements CompositeTransport {
}
}
@Override
public void stop() throws Exception {
try {
synchronized (reconnectMutex) {
@ -321,14 +325,14 @@ public class FanoutTransport implements CompositeTransport {
}
}
public int getMinAckCount() {
return minAckCount;
}
public int getMinAckCount() {
return minAckCount;
}
public void setMinAckCount(int minAckCount) {
this.minAckCount = minAckCount;
}
public void setMinAckCount(int minAckCount) {
this.minAckCount = minAckCount;
}
public long getInitialReconnectDelay() {
return initialReconnectDelay;
}
@ -361,6 +365,7 @@ public class FanoutTransport implements CompositeTransport {
this.maxReconnectAttempts = maxReconnectAttempts;
}
@Override
public void oneway(Object o) throws IOException {
final Command command = (Command)o;
try {
@ -401,7 +406,7 @@ public class FanoutTransport implements CompositeTransport {
int size = fanout ? minAckCount : 1;
requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
}
// Send the message.
if (fanout) {
for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
@ -450,14 +455,17 @@ public class FanoutTransport implements CompositeTransport {
return true;
}
@Override
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}
@Override
public Object request(Object command) throws IOException {
throw new AssertionError("Unsupported Method");
}
@Override
public Object request(Object command, int timeout) throws IOException {
throw new AssertionError("Unsupported Method");
}
@ -471,14 +479,17 @@ public class FanoutTransport implements CompositeTransport {
}
}
@Override
public TransportListener getTransportListener() {
return transportListener;
}
@Override
public void setTransportListener(TransportListener commandListener) {
this.transportListener = commandListener;
}
@Override
public <T> T narrow(Class<T> target) {
if (target.isAssignableFrom(getClass())) {
@ -511,6 +522,7 @@ public class FanoutTransport implements CompositeTransport {
}
}
@Override
public void add(boolean reblance,URI uris[]) {
synchronized (reconnectMutex) {
@ -535,6 +547,7 @@ public class FanoutTransport implements CompositeTransport {
}
@Override
public void remove(boolean rebalance,URI uris[]) {
synchronized (reconnectMutex) {
@ -556,24 +569,29 @@ public class FanoutTransport implements CompositeTransport {
}
}
@Override
public void reconnect(URI uri) throws IOException {
add(true,new URI[]{uri});
}
add(true,new URI[]{uri});
}
@Override
public boolean isReconnectSupported() {
return true;
}
@Override
public boolean isUpdateURIsSupported() {
return true;
}
@Override
public void updateURIs(boolean reblance,URI[] uris) throws IOException {
add(reblance,uris);
}
@Override
public String getRemoteAddress() {
if (primary != null) {
if (primary.transport != null) {
@ -589,6 +607,7 @@ public class FanoutTransport implements CompositeTransport {
}
}
@Override
public boolean isFaultTolerant() {
return true;
}
@ -601,15 +620,18 @@ public class FanoutTransport implements CompositeTransport {
this.fanOutQueues = fanOutQueues;
}
public boolean isDisposed() {
return disposed;
}
@Override
public boolean isDisposed() {
return disposed;
}
@Override
public boolean isConnected() {
return connected;
}
@Override
public int getReceiveCounter() {
int rc = 0;
synchronized (reconnectMutex) {

View File

@ -21,9 +21,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
*
*
*/
public class FactoryFinder {
@ -39,7 +40,7 @@ public class FactoryFinder {
*/
public interface ObjectFactory {
/**
* @param path the full service path
* @param path the full service path
* @return
*/
public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException;
@ -50,8 +51,9 @@ public class FactoryFinder {
* The default implementation of Object factory which works well in standalone applications.
*/
protected static class StandaloneObjectFactory implements ObjectFactory {
final ConcurrentHashMap<String, Class> classMap = new ConcurrentHashMap<String, Class>();
final ConcurrentMap<String, Class> classMap = new ConcurrentHashMap<String, Class>();
@Override
public Object create(final String path) throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
Class clazz = classMap.get(path);
if (clazz == null) {
@ -146,5 +148,5 @@ public class FactoryFinder {
return objectFactory.create(path+key);
}
}

View File

@ -21,6 +21,8 @@ import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@ -31,23 +33,23 @@ import org.slf4j.LoggerFactory;
public class DiscoveryRegistryServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(HTTPDiscoveryAgent.class);
long maxKeepAge = 1000*60*60; // 1 hour.
ConcurrentHashMap<String, ConcurrentHashMap<String, Long>> serviceGroups = new ConcurrentHashMap<String, ConcurrentHashMap<String, Long>>();
ConcurrentMap<String, ConcurrentMap<String, Long>> serviceGroups = new ConcurrentHashMap<String, ConcurrentMap<String, Long>>();
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
String group = req.getPathInfo();
String service = req.getHeader("service");
LOG.debug("Registering: group="+group+", service="+service);
ConcurrentHashMap<String, Long> services = getServiceGroup(group);
ConcurrentMap<String, Long> services = getServiceGroup(group);
services.put(service, System.currentTimeMillis());
}
private ConcurrentHashMap<String, Long> getServiceGroup(String group) {
ConcurrentHashMap<String, Long> rc = serviceGroups.get(group);
private ConcurrentMap<String, Long> getServiceGroup(String group) {
ConcurrentMap<String, Long> rc = serviceGroups.get(group);
if( rc == null ) {
rc = new ConcurrentHashMap<String, Long>();
serviceGroups.put(group, rc);
@ -63,16 +65,16 @@ public class DiscoveryRegistryServlet extends HttpServlet {
if( p!=null ) {
freshness = Long.parseLong(p);
}
String group = req.getPathInfo();
LOG.debug("group="+group);
ConcurrentHashMap<String, Long> services = getServiceGroup(group);
ConcurrentMap<String, Long> services = getServiceGroup(group);
PrintWriter writer = resp.getWriter();
long now = System.currentTimeMillis();
long dropTime = now-maxKeepAge;
long dropTime = now-maxKeepAge;
long minimumTime = now-freshness;
ArrayList<String> dropList = new ArrayList<String>();
for (Map.Entry<String, Long> entry : services.entrySet()) {
if( entry.getValue() > minimumTime ) {
@ -81,26 +83,26 @@ public class DiscoveryRegistryServlet extends HttpServlet {
dropList.add(entry.getKey());
}
}
// We might as well get rid of the really old entries.
for (String service : dropList) {
services.remove(service);
}
} catch (Exception e) {
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Error occured: "+e);
}
}
@Override
protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
String group = req.getPathInfo();
String service = req.getHeader("service");
LOG.debug("Unregistering: group="+group+", service="+service);
ConcurrentHashMap<String, Long> services = getServiceGroup(group);
ConcurrentMap<String, Long> services = getServiceGroup(group);
services.remove(service);
}
}

View File

@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
@ -96,8 +97,8 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
private final WireFormat wireFormat = new OpenWireFormat();
private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
private final ConcurrentMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
private final ConcurrentMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
private SystemUsage usageManager;
private long checkpointInterval = 1000 * 60 * 5;

View File

@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -43,7 +44,7 @@ public class PooledConnectionFactoryMaximumActiveTest extends JmsPoolTestSupport
public static Connection conn = null;
public static int sleepTimeout = 5000;
private static ConcurrentHashMap<Integer, Session> sessions = new ConcurrentHashMap<Integer, Session>();
private static ConcurrentMap<Integer, Session> sessions = new ConcurrentHashMap<Integer, Session>();
public static void addSession(Session s) {
sessions.put(s.hashCode(), s);

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -233,7 +234,7 @@ public class PooledConnectionFactoryTest extends JmsPoolTestSupport {
cf.setCreateConnectionOnStartup(createOnStart);
cf.start();
final ConcurrentHashMap<ConnectionId, Connection> connections = new ConcurrentHashMap<ConnectionId, Connection>();
final ConcurrentMap<ConnectionId, Connection> connections = new ConcurrentHashMap<ConnectionId, Connection>();
final ExecutorService executor = Executors.newFixedThreadPool(numConnections);
for (int i = 0; i < numConnections; ++i) {

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -59,7 +60,7 @@ import org.slf4j.LoggerFactory;
*/
public class KahaDBTransactionStore implements TransactionStore {
static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
private final KahaDBStore theStore;
public KahaDBTransactionStore(KahaDBStore theStore) {
@ -231,6 +232,7 @@ public class KahaDBTransactionStore implements TransactionStore {
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
@Override
public void prepare(TransactionId txid) throws IOException {
KahaTransactionInfo info = getTransactionInfo(txid);
if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
@ -252,6 +254,7 @@ public class KahaDBTransactionStore implements TransactionStore {
return tx;
}
@Override
public void commit(TransactionId txid, boolean wasPrepared, final Runnable preCommit, Runnable postCommit)
throws IOException {
if (txid != null) {
@ -307,6 +310,7 @@ public class KahaDBTransactionStore implements TransactionStore {
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
@Override
public void rollback(TransactionId txid) throws IOException {
if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
KahaTransactionInfo info = getTransactionInfo(txid);
@ -324,12 +328,15 @@ public class KahaDBTransactionStore implements TransactionStore {
}
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
}
@Override
public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
XATransactionId xid = (XATransactionId) entry.getKey();
@ -509,10 +516,12 @@ public class KahaDBTransactionStore implements TransactionStore {
} else {
Tx tx = getTx(ack.getTransactionId());
tx.add(new RemoveMessageCommand(context) {
@Override
public MessageAck getMessageAck() {
return ack;
}
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
return AbstractMessageStore.FUTURE;

View File

@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@ -55,7 +56,7 @@ import org.slf4j.LoggerFactory;
public class MultiKahaDBTransactionStore implements TransactionStore {
static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
final ConcurrentHashMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
private Journal journal;
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
@ -100,7 +101,7 @@ public class MQTTProtocolConverter {
private final ProducerId producerId = new ProducerId(sessionId, 1);
private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator();
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE);
private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.mqtt.strategy;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@ -58,8 +59,8 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti
protected MQTTProtocolConverter protocol;
protected BrokerService brokerService;
protected final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
protected final ConcurrentHashMap<String, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<String, MQTTSubscription>();
protected final ConcurrentMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
protected final ConcurrentMap<String, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<String, MQTTSubscription>();
protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();

View File

@ -16,8 +16,27 @@
*/
package org.apache.activemq.partition;
import org.apache.activemq.broker.*;
import org.apache.activemq.command.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.partition.dto.Partitioning;
import org.apache.activemq.partition.dto.Target;
import org.apache.activemq.state.ConsumerState;
@ -27,12 +46,6 @@ import org.apache.activemq.util.LRUCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* A BrokerFilter which partitions client connections over a cluster of brokers.
*
@ -267,7 +280,7 @@ public class PartitionBroker extends BrokerFilter {
return null;
}
protected final ConcurrentHashMap<ConnectionId, ConnectionMonitor> monitors = new ConcurrentHashMap<ConnectionId, ConnectionMonitor>();
protected final ConcurrentMap<ConnectionId, ConnectionMonitor> monitors = new ConcurrentHashMap<ConnectionId, ConnectionMonitor>();
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {

View File

@ -16,14 +16,14 @@
*/
package org.apache.activemq.shiro.subject;
import java.security.Principal;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.security.SecurityContext;
import org.apache.shiro.subject.Subject;
import java.security.Principal;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* ActiveMQ {@code SecurityContext} implementation that retains a Shiro {@code Subject} instance for use during
* security checks and other security-related operations.
@ -73,12 +73,12 @@ public class SubjectSecurityContext extends SecurityContext {
}
@Override
public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedReadDests() {
public ConcurrentMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedReadDests() {
throw notAllowed("getAuthorizedReadDests");
}
@Override
public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedWriteDests() {
public ConcurrentMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedWriteDests() {
throw notAllowed("getAuthorizedWriteDests");
}

View File

@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
@ -104,15 +105,15 @@ public class ProtocolConverter {
private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
private final ConcurrentHashMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
private final ConcurrentMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
private final ConcurrentMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
private final ConcurrentMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
private final StompTransport stompTransport;
private final ConcurrentHashMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String, AckEntry>();
private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String, AckEntry>();
private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
private final Object commnadIdMutex = new Object();

View File

@ -17,6 +17,7 @@
package org.apache.activemq.bugs;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@ -148,7 +149,7 @@ public class AMQ1936Test extends TestCase {
}
public void testForDuplicateMessages() throws Exception {
final ConcurrentHashMap<String, String> messages = new ConcurrentHashMap<String, String>();
final ConcurrentMap<String, String> messages = new ConcurrentHashMap<String, String>();
final Object lock = new Object();
final CountDownLatch duplicateSignal = new CountDownLatch(1);
final AtomicInteger messageCount = new AtomicInteger(0);

View File

@ -23,6 +23,7 @@ import java.lang.reflect.Field;
import java.net.URI;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import javax.jms.Destination;
@ -98,7 +99,7 @@ public class AMQ2364Test {
ConnectionStateTracker stateTracker = (ConnectionStateTracker) stateTrackerField.get(failoverTrans);
Field statesField = ConnectionStateTracker.class.getDeclaredField("connectionStates");
statesField.setAccessible(true);
ConcurrentHashMap<ConnectionId, ConnectionState> states =
ConcurrentMap<ConnectionId, ConnectionState> states =
(ConcurrentHashMap<ConnectionId, ConnectionState>) statesField.get(stateTracker);
ConnectionState state = states.get(connection.getConnectionInfo().getConnectionId());

View File

@ -22,7 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import javax.jms.DeliveryMode;
@ -57,7 +57,7 @@ public class AMQ4062Test {
private BrokerService service;
private PolicyEntry policy;
private ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions;
private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions;
private static final int PREFETCH_SIZE_5=5;
private String connectionUri;
@ -174,17 +174,17 @@ public class AMQ4062Test {
}
@SuppressWarnings("unchecked")
private ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions() throws NoSuchFieldException, IllegalAccessException {
private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions() throws NoSuchFieldException, IllegalAccessException {
if(durableSubscriptions!=null) return durableSubscriptions;
RegionBroker regionBroker=(RegionBroker)service.getRegionBroker();
TopicRegion region=(TopicRegion)regionBroker.getTopicRegion();
Field field=TopicRegion.class.getDeclaredField("durableSubscriptions");
field.setAccessible(true);
durableSubscriptions=(ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>)field.get(region);
durableSubscriptions=(ConcurrentMap<SubscriptionKey, DurableTopicSubscription>)field.get(region);
return durableSubscriptions;
}
private ConsumerInfo getConsumerInfo(ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions) {
private ConsumerInfo getConsumerInfo(ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions) {
ConsumerInfo info=null;
for(Iterator<DurableTopicSubscription> it=durableSubscriptions.values().iterator();it.hasNext();){
Subscription sub = it.next();

View File

@ -23,7 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.net.URI;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@ -238,7 +238,7 @@ public class CompressionOverNetworkTest {
if (bridges.length > 0) {
LOG.info(brokerService + " bridges " + Arrays.toString(bridges));
DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0];
ConcurrentHashMap<ConsumerId, DemandSubscription> forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
ConcurrentMap<ConsumerId, DemandSubscription> forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
LOG.info(brokerService + " bridge " + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges);
if (!forwardingBridges.isEmpty()) {
for (DemandSubscription demandSubscription : forwardingBridges.values()) {

View File

@ -46,7 +46,7 @@ public class NetworkLoopBackTest {
}
});
final DemandForwardingBridgeSupport loopbackBridge = (DemandForwardingBridgeSupport) networkConnector.bridges.elements().nextElement();
final DemandForwardingBridgeSupport loopbackBridge = (DemandForwardingBridgeSupport) networkConnector.bridges.values().iterator().next();
assertTrue("nc started", networkConnector.isStarted());
assertTrue("It should get disposed", Wait.waitFor(new Wait.Condition() {

View File

@ -22,7 +22,7 @@ import static org.junit.Assert.assertTrue;
import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@ -173,7 +173,7 @@ public class SimpleNetworkTest {
if (bridges.length > 0) {
LOG.info(brokerService + " bridges " + Arrays.toString(bridges));
DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0];
ConcurrentHashMap<ConsumerId, DemandSubscription> forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
ConcurrentMap<ConsumerId, DemandSubscription> forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
LOG.info(brokerService + " bridge " + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges);
if (!forwardingBridges.isEmpty()) {
for (DemandSubscription demandSubscription : forwardingBridges.values()) {

View File

@ -22,6 +22,7 @@ import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.net.SocketFactory;
@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
public class SocketTstFactory extends SocketFactory {
private static final Logger LOG = LoggerFactory.getLogger(SocketTstFactory.class);
private static final ConcurrentHashMap<InetAddress, Integer> closeIter = new ConcurrentHashMap<InetAddress, Integer>();
private static final ConcurrentMap<InetAddress, Integer> closeIter = new ConcurrentHashMap<InetAddress, Integer>();
private class SocketTst {