This commit is contained in:
Dejan Bosanac 2015-01-26 12:53:29 +01:00
parent a2c5c22ec5
commit d5470254af
4 changed files with 20 additions and 5 deletions

View File

@ -115,7 +115,7 @@ public class MQTTProtocolConverter {
private CONNECT connect;
private String clientId;
private long defaultKeepAlive;
private int activeMQSubscriptionPrefetch = 1;
private int activeMQSubscriptionPrefetch = -1;
private final MQTTPacketIdGenerator packetIdGenerator;
private boolean publishDollarTopics;

View File

@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
@ -70,12 +71,17 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
consumerInfo.setRetroactive(true);
consumerInfo.setDispatchAsync(true);
// create durable subscriptions only when clean session is false
if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName);
consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH);
}
if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
}
return doSubscribe(consumerInfo, topicName, requestedQoS);

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@ -85,21 +86,25 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
@Override
public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException {
ActiveMQDestination destination = null;
ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
String converted = VIRTUALTOPIC_CONSUMER_PREFIX + protocol.getClientId() + ":" + requestedQoS + "." +
VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName);
destination = new ActiveMQQueue(converted);
consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH);
} else {
String converted = convertMQTTToActiveMQ(topicName);
if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) {
converted = VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName);
}
destination = new ActiveMQTopic(converted);
consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
}
ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
}
consumerInfo.setRetroactive(true);
consumerInfo.setDispatchAsync(true);
@ -211,7 +216,10 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
consumerInfo.setDestination(queue);
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH);
if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
}
consumerInfo.setRetroactive(true);
consumerInfo.setDispatchAsync(true);

View File

@ -151,6 +151,7 @@ public class MQTTTest extends MQTTTestSupport {
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Message " + i;
if (i == NUM_MESSAGES / 2) {
latch.await(20, TimeUnit.SECONDS);
subscriptionProvider.unsubscribe(topic);
}
publishProvider.publish(topic, payload.getBytes(), AT_LEAST_ONCE);