Fix instability in test cases

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1429809 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2013-01-07 14:32:20 +00:00
parent 5a8ca0b8a8
commit 667237f72a
3 changed files with 46 additions and 12 deletions

View File

@ -85,6 +85,7 @@ class MQTTProtocolConverter {
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 1.5; private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 1.5;
private static final int DEFAULT_CACHE_SIZE = 5000;
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1); private final SessionId sessionId = new SessionId(connectionId, -1);
@ -95,10 +96,10 @@ class MQTTProtocolConverter {
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>(); private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>(); private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>();
private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>(); private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>(DEFAULT_CACHE_SIZE);
private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>(); private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>(DEFAULT_CACHE_SIZE);
private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(); private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(); private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
private final MQTTTransport mqttTransport; private final MQTTTransport mqttTransport;
private final Object commnadIdMutex = new Object(); private final Object commnadIdMutex = new Object();
@ -108,6 +109,7 @@ class MQTTProtocolConverter {
private CONNECT connect; private CONNECT connect;
private String clientId; private String clientId;
private long defaultKeepAlive; private long defaultKeepAlive;
private int activeMQSubscriptionPrefetch=1;
private final String QOS_PROPERTY_NAME = "QoSPropertyName"; private final String QOS_PROPERTY_NAME = "QoSPropertyName";
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) { public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) {
@ -125,7 +127,7 @@ class MQTTProtocolConverter {
command.setCommandId(generateCommandId()); command.setCommandId(generateCommandId());
if (handler != null) { if (handler != null) {
command.setResponseRequired(true); command.setResponseRequired(true);
resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); resposeHandlers.put(command.getCommandId(), handler);
} }
mqttTransport.sendToActiveMQ(command); mqttTransport.sendToActiveMQ(command);
} }
@ -297,7 +299,7 @@ class MQTTProtocolConverter {
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id); ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setDestination(destination); consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(1000); consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
consumerInfo.setDispatchAsync(true); consumerInfo.setDispatchAsync(true);
if (!connect.cleanSession() && (connect.clientId() != null)) { if (!connect.cleanSession() && (connect.clientId() != null)) {
//by default subscribers are persistent //by default subscribers are persistent
@ -316,8 +318,8 @@ class MQTTProtocolConverter {
void onUnSubscribe(UNSUBSCRIBE command) { void onUnSubscribe(UNSUBSCRIBE command) {
UTF8Buffer[] topics = command.topics(); UTF8Buffer[] topics = command.topics();
if (topics != null) { if (topics != null) {
for (int i = 0; i < topics.length; i++) { for (UTF8Buffer topic : topics) {
onUnSubscribe(topics[i]); onUnSubscribe(topic);
} }
} }
UNSUBACK ack = new UNSUBACK(); UNSUBACK ack = new UNSUBACK();
@ -332,7 +334,10 @@ class MQTTProtocolConverter {
if (info != null) { if (info != null) {
subscriptionsByConsumerId.remove(info.getConsumerId()); subscriptionsByConsumerId.remove(info.getConsumerId());
} }
RemoveInfo removeInfo = info.createRemoveCommand(); RemoveInfo removeInfo = null;
if (info != null) {
removeInfo = info.createRemoveCommand();
}
sendToActiveMQ(removeInfo, null); sendToActiveMQ(removeInfo, null);
} }
} }
@ -441,7 +446,7 @@ class MQTTProtocolConverter {
msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE); msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal()); msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
ActiveMQTopic topic = null; ActiveMQTopic topic;
synchronized (activeMQTopicMap) { synchronized (activeMQTopicMap) {
topic = activeMQTopicMap.get(command.topicName()); topic = activeMQTopicMap.get(command.topicName());
if (topic == null) { if (topic == null) {
@ -679,9 +684,23 @@ class MQTTProtocolConverter {
* Set the default keep alive time (in milliseconds) that would be used if configured on server side * Set the default keep alive time (in milliseconds) that would be used if configured on server side
* and the client sends a keep-alive value of 0 (zero) on a CONNECT frame * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
* *
* @param defaultKeepAlive * @param defaultKeepAlive the keepAlive in milliseconds
*/ */
public void setDefaultKeepAlive(long defaultKeepAlive) { public void setDefaultKeepAlive(long defaultKeepAlive) {
this.defaultKeepAlive = defaultKeepAlive; this.defaultKeepAlive = defaultKeepAlive;
} }
public int getActiveMQSubscriptionPrefetch() {
return activeMQSubscriptionPrefetch;
}
/**
* set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
* The default = 1
* @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription
*/
public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch;
}
} }

View File

@ -153,5 +153,19 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
protocolConverter.setDefaultKeepAlive(defaultHeartBeat); protocolConverter.setDefaultKeepAlive(defaultHeartBeat);
} }
public int getActiveMQSubscriptionPrefetch() {
return protocolConverter.getActiveMQSubscriptionPrefetch();
}
/**
* set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
* The default = 1
* @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription
*/
public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
protocolConverter.setActiveMQSubscriptionPrefetch(activeMQSubscriptionPrefetch);
}
} }

View File

@ -78,7 +78,7 @@ public class MQTTTest {
brokerService.setPersistent(false); brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false); brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(false); brokerService.setUseJmx(false);
this.numberOfMessages = 1000; this.numberOfMessages = 3000;
} }
@After @After
@ -200,6 +200,7 @@ public class MQTTTest {
assertNotNull("Should get a message", message); assertNotNull("Should get a message", message);
LOG.debug(payload); LOG.debug(payload);
message.ack(); message.ack();
//System.err.println("Sent " + payload + " GOT " + new String(message.getPayload()));
assertEquals(payload, new String(message.getPayload())); assertEquals(payload, new String(message.getPayload()));
} }
subConnection.disconnect(); subConnection.disconnect();