Declare ConcurrentMaps instead of ConcurrentHashMaps
See PR #88 for discussion.
This commit is contained in:
parent
1cb372bcd8
commit
2a647c176f
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
public class FactoryFinder
|
public class FactoryFinder
|
||||||
{
|
{
|
||||||
|
@ -51,7 +52,7 @@ public class FactoryFinder
|
||||||
*/
|
*/
|
||||||
protected static class StandaloneObjectFactory implements ObjectFactory
|
protected static class StandaloneObjectFactory implements ObjectFactory
|
||||||
{
|
{
|
||||||
final ConcurrentHashMap<String, Class> classMap = new ConcurrentHashMap<String, Class>();
|
final ConcurrentMap<String, Class> classMap = new ConcurrentHashMap<String, Class>();
|
||||||
|
|
||||||
public Object create(final String path) throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException
|
public Object create(final String path) throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException
|
||||||
{
|
{
|
||||||
|
|
|
@ -35,12 +35,12 @@ public class MQTTSessionState
|
||||||
|
|
||||||
private ServerMessage willMessage;
|
private ServerMessage willMessage;
|
||||||
|
|
||||||
private final ConcurrentHashMap<String, MqttTopicSubscription> subscriptions = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, MqttTopicSubscription> subscriptions = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
// Used to store Packet ID of Publish QoS1 and QoS2 message. See spec: 4.3.3 QoS 2: Exactly once delivery. Method B.
|
// Used to store Packet ID of Publish QoS1 and QoS2 message. See spec: 4.3.3 QoS 2: Exactly once delivery. Method B.
|
||||||
private Map<Integer, MQTTMessageInfo> messageRefStore;
|
private Map<Integer, MQTTMessageInfo> messageRefStore;
|
||||||
|
|
||||||
private ConcurrentHashMap<String, Map<Long, Integer>> addressMessageMap;
|
private ConcurrentMap<String, Map<Long, Integer>> addressMessageMap;
|
||||||
|
|
||||||
private Set<Integer> pubRec;
|
private Set<Integer> pubRec;
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ public class MQTTSessionState
|
||||||
// Objects track the Outbound message references
|
// Objects track the Outbound message references
|
||||||
private Map<Integer, Pair<String, Long>> outboundMessageReferenceStore;
|
private Map<Integer, Pair<String, Long>> outboundMessageReferenceStore;
|
||||||
|
|
||||||
private ConcurrentMap<String, ConcurrentHashMap<Long, Integer>> reverseOutboundReferenceStore;
|
private ConcurrentMap<String, ConcurrentMap<Long, Integer>> reverseOutboundReferenceStore;
|
||||||
|
|
||||||
private final Object outboundLock = new Object();
|
private final Object outboundLock = new Object();
|
||||||
|
|
||||||
|
|
|
@ -25,14 +25,15 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
public class MQTTSubscriptionManager
|
public class MQTTSubscriptionManager
|
||||||
{
|
{
|
||||||
private MQTTSession session;
|
private MQTTSession session;
|
||||||
|
|
||||||
private ConcurrentHashMap<Long, Integer> consumerQoSLevels;
|
private ConcurrentMap<Long, Integer> consumerQoSLevels;
|
||||||
|
|
||||||
private ConcurrentHashMap<String, ServerConsumer> consumers;
|
private ConcurrentMap<String, ServerConsumer> consumers;
|
||||||
|
|
||||||
private MQTTLogger log = MQTTLogger.LOGGER;
|
private MQTTLogger log = MQTTLogger.LOGGER;
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
@ -117,7 +118,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
||||||
|
|
||||||
private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<OpenWireConnection>();
|
private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<OpenWireConnection>();
|
||||||
|
|
||||||
protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
|
protected final ConcurrentMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
|
||||||
|
|
||||||
private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>();
|
private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>();
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
package org.apache.activemq.artemis.core.protocol.openwire.amq;
|
package org.apache.activemq.artemis.core.protocol.openwire.amq;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.activemq.broker.region.MessageReference;
|
import org.apache.activemq.broker.region.MessageReference;
|
||||||
|
@ -38,7 +38,7 @@ public class AMQConnectionContext
|
||||||
private OpenWireProtocolManager broker; //use protocol manager to represent the broker
|
private OpenWireProtocolManager broker; //use protocol manager to represent the broker
|
||||||
private boolean inRecoveryMode;
|
private boolean inRecoveryMode;
|
||||||
private AMQTransaction transaction;
|
private AMQTransaction transaction;
|
||||||
private ConcurrentHashMap<TransactionId, AMQTransaction> transactions;
|
private ConcurrentMap<TransactionId, AMQTransaction> transactions;
|
||||||
private AMQSecurityContext securityContext;
|
private AMQSecurityContext securityContext;
|
||||||
private ConnectionId connectionId;
|
private ConnectionId connectionId;
|
||||||
private String clientId;
|
private String clientId;
|
||||||
|
@ -216,13 +216,13 @@ public class AMQConnectionContext
|
||||||
this.inRecoveryMode = inRecoveryMode;
|
this.inRecoveryMode = inRecoveryMode;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConcurrentHashMap<TransactionId, AMQTransaction> getTransactions()
|
public ConcurrentMap<TransactionId, AMQTransaction> getTransactions()
|
||||||
{
|
{
|
||||||
return transactions;
|
return transactions;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setTransactions(
|
public void setTransactions(
|
||||||
ConcurrentHashMap<TransactionId, AMQTransaction> transactions)
|
ConcurrentMap<TransactionId, AMQTransaction> transactions)
|
||||||
{
|
{
|
||||||
this.transactions = transactions;
|
this.transactions = transactions;
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
|
||||||
|
@ -45,8 +46,8 @@ public abstract class AMQSecurityContext
|
||||||
|
|
||||||
final String userName;
|
final String userName;
|
||||||
|
|
||||||
final ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> authorizedReadDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>();
|
final ConcurrentMap<ActiveMQDestination, ActiveMQDestination> authorizedReadDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>();
|
||||||
final ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> authorizedWriteDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>();
|
final ConcurrentMap<ActiveMQDestination, ActiveMQDestination> authorizedWriteDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>();
|
||||||
|
|
||||||
public AMQSecurityContext(String userName)
|
public AMQSecurityContext(String userName)
|
||||||
{
|
{
|
||||||
|
@ -77,12 +78,12 @@ public abstract class AMQSecurityContext
|
||||||
return userName;
|
return userName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedReadDests()
|
public ConcurrentMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedReadDests()
|
||||||
{
|
{
|
||||||
return authorizedReadDests;
|
return authorizedReadDests;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedWriteDests()
|
public ConcurrentMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedWriteDests()
|
||||||
{
|
{
|
||||||
return authorizedWriteDests;
|
return authorizedWriteDests;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,12 @@ package org.proton.plug.test.minimalserver;
|
||||||
|
|
||||||
import java.util.concurrent.BlockingDeque;
|
import java.util.concurrent.BlockingDeque;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.LinkedBlockingDeque;
|
import java.util.concurrent.LinkedBlockingDeque;
|
||||||
|
|
||||||
public class DumbServer
|
public class DumbServer
|
||||||
{
|
{
|
||||||
static ConcurrentHashMap<String, BlockingDeque<Object>> maps = new ConcurrentHashMap<>();
|
static ConcurrentMap<String, BlockingDeque<Object>> maps = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public static BlockingDeque getQueue(String name)
|
public static BlockingDeque getQueue(String name)
|
||||||
{
|
{
|
||||||
|
|
|
@ -30,6 +30,7 @@ import javax.ws.rs.core.Response;
|
||||||
import javax.ws.rs.core.UriBuilder;
|
import javax.ws.rs.core.UriBuilder;
|
||||||
import javax.ws.rs.core.UriInfo;
|
import javax.ws.rs.core.UriInfo;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
@ -39,7 +40,7 @@ import org.apache.activemq.artemis.rest.util.TimeoutTask;
|
||||||
|
|
||||||
public class ConsumersResource implements TimeoutTask.Callback
|
public class ConsumersResource implements TimeoutTask.Callback
|
||||||
{
|
{
|
||||||
protected ConcurrentHashMap<String, QueueConsumer> queueConsumers = new ConcurrentHashMap<String, QueueConsumer>();
|
protected ConcurrentMap<String, QueueConsumer> queueConsumers = new ConcurrentHashMap<String, QueueConsumer>();
|
||||||
protected ClientSessionFactory sessionFactory;
|
protected ClientSessionFactory sessionFactory;
|
||||||
protected String destination;
|
protected String destination;
|
||||||
protected final String startup = Long.toString(System.currentTimeMillis());
|
protected final String startup = Long.toString(System.currentTimeMillis());
|
||||||
|
|
|
@ -30,6 +30,7 @@ import javax.ws.rs.core.Response;
|
||||||
import javax.ws.rs.core.UriBuilder;
|
import javax.ws.rs.core.UriBuilder;
|
||||||
import javax.ws.rs.core.UriInfo;
|
import javax.ws.rs.core.UriInfo;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
@ -45,7 +46,7 @@ import org.apache.activemq.artemis.rest.util.TimeoutTask;
|
||||||
|
|
||||||
public class SubscriptionsResource implements TimeoutTask.Callback
|
public class SubscriptionsResource implements TimeoutTask.Callback
|
||||||
{
|
{
|
||||||
protected ConcurrentHashMap<String, QueueConsumer> queueConsumers = new ConcurrentHashMap<String, QueueConsumer>();
|
protected ConcurrentMap<String, QueueConsumer> queueConsumers = new ConcurrentHashMap<String, QueueConsumer>();
|
||||||
protected ClientSessionFactory sessionFactory;
|
protected ClientSessionFactory sessionFactory;
|
||||||
protected String destination;
|
protected String destination;
|
||||||
protected final String startup = Long.toString(System.currentTimeMillis());
|
protected final String startup = Long.toString(System.currentTimeMillis());
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
import javax.jms.DeliveryMode;
|
import javax.jms.DeliveryMode;
|
||||||
|
@ -57,7 +58,7 @@ public class AMQ4062Test {
|
||||||
|
|
||||||
private BrokerService service;
|
private BrokerService service;
|
||||||
private PolicyEntry policy;
|
private PolicyEntry policy;
|
||||||
private ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions;
|
private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions;
|
||||||
|
|
||||||
private static final int PREFETCH_SIZE_5=5;
|
private static final int PREFETCH_SIZE_5=5;
|
||||||
private String connectionUri;
|
private String connectionUri;
|
||||||
|
@ -174,17 +175,17 @@ public class AMQ4062Test {
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions() throws NoSuchFieldException, IllegalAccessException {
|
private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions() throws NoSuchFieldException, IllegalAccessException {
|
||||||
if(durableSubscriptions!=null) return durableSubscriptions;
|
if(durableSubscriptions!=null) return durableSubscriptions;
|
||||||
RegionBroker regionBroker=(RegionBroker)service.getRegionBroker();
|
RegionBroker regionBroker=(RegionBroker)service.getRegionBroker();
|
||||||
TopicRegion region=(TopicRegion)regionBroker.getTopicRegion();
|
TopicRegion region=(TopicRegion)regionBroker.getTopicRegion();
|
||||||
Field field=TopicRegion.class.getDeclaredField("durableSubscriptions");
|
Field field=TopicRegion.class.getDeclaredField("durableSubscriptions");
|
||||||
field.setAccessible(true);
|
field.setAccessible(true);
|
||||||
durableSubscriptions=(ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>)field.get(region);
|
durableSubscriptions=(ConcurrentMap<SubscriptionKey, DurableTopicSubscription>)field.get(region);
|
||||||
return durableSubscriptions;
|
return durableSubscriptions;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ConsumerInfo getConsumerInfo(ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions) {
|
private ConsumerInfo getConsumerInfo(ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions) {
|
||||||
ConsumerInfo info=null;
|
ConsumerInfo info=null;
|
||||||
for(Iterator<DurableTopicSubscription> it=durableSubscriptions.values().iterator();it.hasNext();){
|
for(Iterator<DurableTopicSubscription> it=durableSubscriptions.values().iterator();it.hasNext();){
|
||||||
Subscription sub = it.next();
|
Subscription sub = it.next();
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.net.Socket;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
|
@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
|
||||||
public class SocketTstFactory extends SocketFactory {
|
public class SocketTstFactory extends SocketFactory {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(SocketTstFactory.class);
|
private static final Logger LOG = LoggerFactory.getLogger(SocketTstFactory.class);
|
||||||
|
|
||||||
private static final ConcurrentHashMap<InetAddress, Integer> closeIter = new ConcurrentHashMap<InetAddress, Integer>();
|
private static final ConcurrentMap<InetAddress, Integer> closeIter = new ConcurrentHashMap<InetAddress, Integer>();
|
||||||
|
|
||||||
private class SocketTst {
|
private class SocketTst {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue