diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQTTClientProvider.java similarity index 98% rename from activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java rename to activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQTTClientProvider.java index d32906614c..f7e6c481ee 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQTTClientProvider.java @@ -26,7 +26,7 @@ import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; import static org.fusesource.hawtbuf.UTF8Buffer.utf8; -class FuseMQQTTClientProvider implements MQTTClientProvider { +public class FuseMQTTClientProvider implements MQTTClientProvider { private final MQTT mqtt = new MQTT(); private BlockingConnection connection; @Override diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java index 558873ffc2..19aac52625 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java @@ -339,7 +339,7 @@ public class MQTTTestSupport { } protected MQTTClientProvider getMQTTClientProvider() { - return new FuseMQQTTClientProvider(); + return new FuseMQTTClientProvider(); } protected MQTT createMQTTConnection() throws Exception { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java new file mode 100644 index 0000000000..bf24971117 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.network.NetworkTestSupport; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.util.Wait; +import org.apache.commons.lang.ArrayUtils; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.mqtt.client.*; +import org.fusesource.mqtt.client.Topic; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import javax.jms.Message; +import javax.management.ObjectName; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Created by ceposta + * "); + MessageConsumer consumer = session.createConsumer(dest); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + System.out.println("got message! " + message); + latch.countDown(); + // shutdown this connection + Dispatch.getGlobalQueue().execute(new Runnable() { + @Override + public void run() { + try { + session.close(); + connection.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + } + }); + + + return latch; + } + + + private void assertOneDurableSubOn(BrokerService broker, String subName) throws Exception { + BrokerViewMBean brokerView = broker.getAdminView(); + ObjectName[] activeDurableSubs = brokerView.getDurableTopicSubscribers(); + ObjectName[] inactiveDurableSubs = brokerView.getInactiveDurableTopicSubscribers(); + ObjectName[] allDurables = (ObjectName[]) ArrayUtils.addAll(activeDurableSubs, inactiveDurableSubs); + assertEquals(1, allDurables.length); + + // at this point our assertions should prove that we have only on durable sub + DurableSubscriptionViewMBean durableSubView = (DurableSubscriptionViewMBean) + broker.getManagementContext().newProxyInstance(allDurables[0], DurableSubscriptionViewMBean.class, true); + + assertEquals(subName, durableSubView.getClientId()); + } + + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + broker.setPersistent(true); + broker.setBrokerName("local"); + broker.setDataDirectory("target/activemq-data"); + broker.setDeleteAllMessagesOnStartup(true); + TransportConnector tc = broker.addConnector("mqtt://localhost:0"); + localBrokerMQTTPort = tc.getConnectUri().getPort(); + return broker; + } + + @Override + protected BrokerService createRemoteBroker(PersistenceAdapter persistenceAdapter) throws Exception { + BrokerService broker = super.createRemoteBroker(persistenceAdapter); + broker.setPersistent(true); + broker.setDeleteAllMessagesOnStartup(true); + broker.setDataDirectory("target/activemq-data"); + TransportConnector tc = broker.addConnector("mqtt://localhost:0"); + remoteBrokerMQTTPort = tc.getConnectUri().getPort(); + return broker; + } + + private MQTT createMQTTTcpConnection(String clientId, boolean clean, int port) throws Exception { + MQTT mqtt = new MQTT(); + mqtt.setConnectAttemptsMax(1); + mqtt.setReconnectAttemptsMax(0); + mqtt.setTracer(createTracer()); + if (clientId != null) { + mqtt.setClientId(clientId); + } + mqtt.setCleanSession(clean); + mqtt.setHost("localhost", port); + return mqtt; + } + + protected Tracer createTracer() { + return new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + LOG.info("Client Received:\n" + frame); + } + + @Override + public void onSend(MQTTFrame frame) { + LOG.info("Client Sent:\n" + frame); + } + + @Override + public void debug(String message, Object... args) { + LOG.info(String.format(message, args)); + } + }; + } + +}