From 59f8cfc6049ff2d9cd7b50b1f558259943db4b8d Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 12 Aug 2014 14:59:51 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5290 Minor cleanup, change transport option from subscriptionStrategyName to just subscriptionStrategy. --- .../transport/mqtt/MQTTProtocolConverter.java | 25 ++++++------ .../transport/mqtt/MQTTTransportFilter.java | 8 ++-- .../MQTTVirtualTopicSubscriptionsTest.java | 2 +- .../MQTTNetworkOfBrokersFailoverTest.java | 39 +++++++++++-------- 4 files changed, 40 insertions(+), 34 deletions(-) diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 62a6f515f9..b4d21ce356 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -88,7 +88,7 @@ public class MQTTProtocolConverter { private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); - private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 0.5; + private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5; static final int DEFAULT_CACHE_SIZE = 5000; private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); @@ -114,11 +114,12 @@ public class MQTTProtocolConverter { private String clientId; private long defaultKeepAlive; private int activeMQSubscriptionPrefetch = 1; - protected static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS"; + private static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS"; private final MQTTPacketIdGenerator packetIdGenerator; private boolean publishDollarTopics; private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/"); + /* * Subscription strategy configuration element. * > mqtt-default-subscriptions @@ -146,7 +147,7 @@ public class MQTTProtocolConverter { if (command instanceof ActiveMQMessage) { ActiveMQMessage msg = (ActiveMQMessage) command; try { - if (!getPublishDollarTopics() && getSubscriptionStrategy().isControlTopic(msg.getDestination())) { + if (!getPublishDollarTopics() && findSubscriptionStrategy().isControlTopic(msg.getDestination())) { // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1 // specification requirements for system assigned destinations. if (handler != null) { @@ -322,7 +323,7 @@ public class MQTTProtocolConverter { packetIdGenerator.startClientSession(getClientId()); } - getSubscriptionStrategy().onConnect(connect); + findSubscriptionStrategy().onConnect(connect); } }); } @@ -345,7 +346,7 @@ public class MQTTProtocolConverter { byte[] qos = new byte[topics.length]; for (int i = 0; i < topics.length; i++) { try { - qos[i] = getSubscriptionStrategy().onSubscribe(topics[i]); + qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]); } catch (IOException e) { throw new MQTTProtocolException("Failed to process subscription request", true, e); } @@ -369,7 +370,7 @@ public class MQTTProtocolConverter { if (topics != null) { for (UTF8Buffer topic : topics) { try { - getSubscriptionStrategy().onUnSubscribe(topic.toString()); + findSubscriptionStrategy().onUnSubscribe(topic.toString()); } catch (IOException e) { throw new MQTTProtocolException("Failed to process unsubscribe request", true, e); } @@ -398,7 +399,7 @@ public class MQTTProtocolConverter { } } else if (command.isMessageDispatch()) { MessageDispatch md = (MessageDispatch) command; - MQTTSubscription sub = getSubscriptionStrategy().getSubscription(md.getConsumerId()); + MQTTSubscription sub = findSubscriptionStrategy().getSubscription(md.getConsumerId()); if (sub != null) { MessageAck ack = sub.createMessageAck(md); PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage()); @@ -502,7 +503,7 @@ public class MQTTProtocolConverter { String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString()); try { - destination = getSubscriptionStrategy().onSend(topicName); + destination = findSubscriptionStrategy().onSend(topicName); } catch (IOException e) { throw JMSExceptionSupport.create(e); } @@ -536,7 +537,7 @@ public class MQTTProtocolConverter { synchronized (mqttTopicMap) { topicName = mqttTopicMap.get(message.getJMSDestination()); if (topicName == null) { - String amqTopicName = getSubscriptionStrategy().onSend(message.getDestination()); + String amqTopicName = findSubscriptionStrategy().onSend(message.getDestination()); topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName); mqttTopicMap.put(message.getJMSDestination(), topicName); } @@ -766,11 +767,11 @@ public class MQTTProtocolConverter { return this.connect.cleanSession(); } - public String getSubscriptionStrategyName() { + public String getSubscriptionStrategy() { return subscriptionStrategyName; } - public void setSubscriptionStrategyName(String name) { + public void setSubscriptionStrategy(String name) { this.subscriptionStrategyName = name; } @@ -785,7 +786,7 @@ public class MQTTProtocolConverter { return clientId; } - protected MQTTSubscriptionStrategy getSubscriptionStrategy() throws IOException { + protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException { if (subsciptionStrategy == null) { synchronized (STRATAGY_FINDER) { if (subsciptionStrategy != null) { diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java index ae557ab91f..7c1566fa18 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java @@ -211,12 +211,12 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor protocolConverter.setPublishDollarTopics(publishDollarTopics); } - public String getSubscriptionStrategyName() { - return protocolConverter != null ? protocolConverter.getSubscriptionStrategyName() : "default"; + public String getSubscriptionStrategy() { + return protocolConverter != null ? protocolConverter.getSubscriptionStrategy() : "default"; } - public void setSubscriptionStrategyName(String name) { - protocolConverter.setSubscriptionStrategyName(name); + public void setSubscriptionStrategy(String name) { + protocolConverter.setSubscriptionStrategy(name); } public int getActiveMQSubscriptionPrefetch() { diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java index 6605f53d92..b4f985c305 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java @@ -28,7 +28,7 @@ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest { @Override @Before public void setUp() throws Exception { - protocolConfig = "transport.subscriptionStrategyName=mqtt-virtual-topic-subscriptions"; + protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions"; super.setUp(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java index e1ab1833cc..203cacd4a0 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java @@ -16,6 +16,19 @@ */ package org.apache.activemq; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.management.ObjectName; + import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.jmx.BrokerViewMBean; @@ -26,20 +39,16 @@ import org.apache.activemq.network.NetworkTestSupport; import org.apache.activemq.store.PersistenceAdapter; import org.apache.commons.lang.ArrayUtils; import org.fusesource.hawtdispatch.Dispatch; -import org.fusesource.mqtt.client.*; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; +import org.fusesource.mqtt.client.Tracer; import org.fusesource.mqtt.codec.MQTTFrame; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.*; -import javax.jms.Message; -import javax.management.ObjectName; -import java.net.URI; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - /** * Created by ceposta *