mirror of https://github.com/apache/activemq.git
Minor cleanup, change transport option from subscriptionStrategyName to just subscriptionStrategy.
This commit is contained in:
parent
ec049a0882
commit
59f8cfc604
|
@ -88,7 +88,7 @@ public class MQTTProtocolConverter {
|
|||
|
||||
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
|
||||
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
|
||||
private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 0.5;
|
||||
private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5;
|
||||
static final int DEFAULT_CACHE_SIZE = 5000;
|
||||
|
||||
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
|
||||
|
@ -114,11 +114,12 @@ public class MQTTProtocolConverter {
|
|||
private String clientId;
|
||||
private long defaultKeepAlive;
|
||||
private int activeMQSubscriptionPrefetch = 1;
|
||||
protected static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
|
||||
private static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
|
||||
private final MQTTPacketIdGenerator packetIdGenerator;
|
||||
private boolean publishDollarTopics;
|
||||
|
||||
private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/");
|
||||
|
||||
/*
|
||||
* Subscription strategy configuration element.
|
||||
* > mqtt-default-subscriptions
|
||||
|
@ -146,7 +147,7 @@ public class MQTTProtocolConverter {
|
|||
if (command instanceof ActiveMQMessage) {
|
||||
ActiveMQMessage msg = (ActiveMQMessage) command;
|
||||
try {
|
||||
if (!getPublishDollarTopics() && getSubscriptionStrategy().isControlTopic(msg.getDestination())) {
|
||||
if (!getPublishDollarTopics() && findSubscriptionStrategy().isControlTopic(msg.getDestination())) {
|
||||
// We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1
|
||||
// specification requirements for system assigned destinations.
|
||||
if (handler != null) {
|
||||
|
@ -322,7 +323,7 @@ public class MQTTProtocolConverter {
|
|||
packetIdGenerator.startClientSession(getClientId());
|
||||
}
|
||||
|
||||
getSubscriptionStrategy().onConnect(connect);
|
||||
findSubscriptionStrategy().onConnect(connect);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -345,7 +346,7 @@ public class MQTTProtocolConverter {
|
|||
byte[] qos = new byte[topics.length];
|
||||
for (int i = 0; i < topics.length; i++) {
|
||||
try {
|
||||
qos[i] = getSubscriptionStrategy().onSubscribe(topics[i]);
|
||||
qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]);
|
||||
} catch (IOException e) {
|
||||
throw new MQTTProtocolException("Failed to process subscription request", true, e);
|
||||
}
|
||||
|
@ -369,7 +370,7 @@ public class MQTTProtocolConverter {
|
|||
if (topics != null) {
|
||||
for (UTF8Buffer topic : topics) {
|
||||
try {
|
||||
getSubscriptionStrategy().onUnSubscribe(topic.toString());
|
||||
findSubscriptionStrategy().onUnSubscribe(topic.toString());
|
||||
} catch (IOException e) {
|
||||
throw new MQTTProtocolException("Failed to process unsubscribe request", true, e);
|
||||
}
|
||||
|
@ -398,7 +399,7 @@ public class MQTTProtocolConverter {
|
|||
}
|
||||
} else if (command.isMessageDispatch()) {
|
||||
MessageDispatch md = (MessageDispatch) command;
|
||||
MQTTSubscription sub = getSubscriptionStrategy().getSubscription(md.getConsumerId());
|
||||
MQTTSubscription sub = findSubscriptionStrategy().getSubscription(md.getConsumerId());
|
||||
if (sub != null) {
|
||||
MessageAck ack = sub.createMessageAck(md);
|
||||
PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
|
||||
|
@ -502,7 +503,7 @@ public class MQTTProtocolConverter {
|
|||
String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
|
||||
|
||||
try {
|
||||
destination = getSubscriptionStrategy().onSend(topicName);
|
||||
destination = findSubscriptionStrategy().onSend(topicName);
|
||||
} catch (IOException e) {
|
||||
throw JMSExceptionSupport.create(e);
|
||||
}
|
||||
|
@ -536,7 +537,7 @@ public class MQTTProtocolConverter {
|
|||
synchronized (mqttTopicMap) {
|
||||
topicName = mqttTopicMap.get(message.getJMSDestination());
|
||||
if (topicName == null) {
|
||||
String amqTopicName = getSubscriptionStrategy().onSend(message.getDestination());
|
||||
String amqTopicName = findSubscriptionStrategy().onSend(message.getDestination());
|
||||
topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName);
|
||||
mqttTopicMap.put(message.getJMSDestination(), topicName);
|
||||
}
|
||||
|
@ -766,11 +767,11 @@ public class MQTTProtocolConverter {
|
|||
return this.connect.cleanSession();
|
||||
}
|
||||
|
||||
public String getSubscriptionStrategyName() {
|
||||
public String getSubscriptionStrategy() {
|
||||
return subscriptionStrategyName;
|
||||
}
|
||||
|
||||
public void setSubscriptionStrategyName(String name) {
|
||||
public void setSubscriptionStrategy(String name) {
|
||||
this.subscriptionStrategyName = name;
|
||||
}
|
||||
|
||||
|
@ -785,7 +786,7 @@ public class MQTTProtocolConverter {
|
|||
return clientId;
|
||||
}
|
||||
|
||||
protected MQTTSubscriptionStrategy getSubscriptionStrategy() throws IOException {
|
||||
protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException {
|
||||
if (subsciptionStrategy == null) {
|
||||
synchronized (STRATAGY_FINDER) {
|
||||
if (subsciptionStrategy != null) {
|
||||
|
|
|
@ -211,12 +211,12 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
|
|||
protocolConverter.setPublishDollarTopics(publishDollarTopics);
|
||||
}
|
||||
|
||||
public String getSubscriptionStrategyName() {
|
||||
return protocolConverter != null ? protocolConverter.getSubscriptionStrategyName() : "default";
|
||||
public String getSubscriptionStrategy() {
|
||||
return protocolConverter != null ? protocolConverter.getSubscriptionStrategy() : "default";
|
||||
}
|
||||
|
||||
public void setSubscriptionStrategyName(String name) {
|
||||
protocolConverter.setSubscriptionStrategyName(name);
|
||||
public void setSubscriptionStrategy(String name) {
|
||||
protocolConverter.setSubscriptionStrategy(name);
|
||||
}
|
||||
|
||||
public int getActiveMQSubscriptionPrefetch() {
|
||||
|
|
|
@ -28,7 +28,7 @@ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
|
|||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
protocolConfig = "transport.subscriptionStrategyName=mqtt-virtual-topic-subscriptions";
|
||||
protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,19 @@
|
|||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.Session;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.broker.jmx.BrokerViewMBean;
|
||||
|
@ -26,20 +39,16 @@ import org.apache.activemq.network.NetworkTestSupport;
|
|||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.commons.lang.ArrayUtils;
|
||||
import org.fusesource.hawtdispatch.Dispatch;
|
||||
import org.fusesource.mqtt.client.*;
|
||||
import org.fusesource.mqtt.client.BlockingConnection;
|
||||
import org.fusesource.mqtt.client.MQTT;
|
||||
import org.fusesource.mqtt.client.QoS;
|
||||
import org.fusesource.mqtt.client.Topic;
|
||||
import org.fusesource.mqtt.client.Tracer;
|
||||
import org.fusesource.mqtt.codec.MQTTFrame;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.jms.*;
|
||||
import javax.jms.Message;
|
||||
import javax.management.ObjectName;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Created by ceposta
|
||||
* <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
|
||||
|
@ -50,6 +59,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
|
|||
private int localBrokerMQTTPort = -1;
|
||||
private int remoteBrokerMQTTPort = -1;
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
useJmx=true;
|
||||
super.setUp();
|
||||
|
@ -65,6 +75,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
|
|||
assertFalse(remoteBrokerMQTTPort == -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
if (remoteBroker.isStarted()) {
|
||||
remoteBroker.stop();
|
||||
|
@ -87,7 +98,6 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
|
|||
// even happens. we do that with advisory messages and a latch:
|
||||
CountDownLatch consumerNetworked = listenForConsumersOn(broker);
|
||||
|
||||
|
||||
// create a subscription with Clean == 0 (durable sub for QoS==1 && QoS==2)
|
||||
// on the remote broker. this sub should still be there after we disconnect
|
||||
MQTT remoteMqtt = createMQTTTcpConnection("foo", false, remoteBrokerMQTTPort);
|
||||
|
@ -100,7 +110,6 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
|
|||
assertQueueExistsOn(broker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
|
||||
remoteConn.disconnect();
|
||||
|
||||
|
||||
// now we reconnect the same sub on the local broker, again with clean==0
|
||||
MQTT localMqtt = createMQTTTcpConnection("foo", false, localBrokerMQTTPort);
|
||||
BlockingConnection localConn = localMqtt.blockingConnection();
|
||||
|
@ -126,9 +135,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
|
|||
// would effectively give us duplicates in a distributed topic scenario:
|
||||
remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
|
||||
msg = remoteConn.receive(500, TimeUnit.MILLISECONDS);
|
||||
assertNull("We have duplicate messages across the cluster for a distributed topic",
|
||||
msg);
|
||||
|
||||
assertNull("We have duplicate messages across the cluster for a distributed topic", msg);
|
||||
}
|
||||
|
||||
private CountDownLatch listenForConsumersOn(BrokerService broker) throws Exception {
|
||||
|
@ -161,7 +168,6 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
|
|||
}
|
||||
});
|
||||
|
||||
|
||||
return latch;
|
||||
}
|
||||
|
||||
|
@ -173,6 +179,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
|
|||
assertTrue(queueNames[0].toString().contains(queueName));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private void assertOneDurableSubOn(BrokerService broker, String subName) throws Exception {
|
||||
BrokerViewMBean brokerView = broker.getAdminView();
|
||||
ObjectName[] activeDurableSubs = brokerView.getDurableTopicSubscribers();
|
||||
|
@ -187,7 +194,6 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
|
|||
assertEquals(subName, durableSubView.getClientId());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = super.createBroker();
|
||||
|
@ -212,7 +218,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
|
|||
}
|
||||
|
||||
private String getDefaultMQTTTransportConnectorUri(){
|
||||
return "mqtt://localhost:0?transport.subscriptionStrategyName=mqtt-virtual-topic-subscriptions";
|
||||
return "mqtt://localhost:0?transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
|
||||
}
|
||||
|
||||
private MQTT createMQTTTcpConnection(String clientId, boolean clean, int port) throws Exception {
|
||||
|
@ -246,5 +252,4 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue