From fb569e3fbcc11d7a3d8a24a4d43680055c383f5e Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 23 Jul 2014 18:46:11 -0400 Subject: [PATCH] Refactored MQTT test suite to use parameterized tests and ensure that the various tests are run on the currently supported transport connectors. --- .../transport/mqtt/AbstractMQTTTest.java | 127 ------- .../transport/mqtt/MQTTAuthTestSupport.java | 115 ++++++ .../transport/mqtt/MQTTAuthTests.java | 174 +++++++++ .../activemq/transport/mqtt/MQTTNioTest.java | 121 ------- .../activemq/transport/mqtt/MQTTSSLTest.java | 96 ----- .../activemq/transport/mqtt/MQTTTest.java | 337 ++++-------------- .../transport/mqtt/MQTTTestSupport.java | 221 ++++++------ .../activemq/transport/mqtt/MQTTTests.java | 71 ---- .../transport/mqtt/PahoMQTNioTTest.java | 119 ------- .../activemq/transport/mqtt/PahoMQTTTest.java | 114 +++++- 10 files changed, 579 insertions(+), 916 deletions(-) delete mode 100644 activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java create mode 100644 activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java create mode 100644 activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java delete mode 100644 activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java delete mode 100644 activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java delete mode 100644 activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTests.java delete mode 100644 activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java deleted file mode 100644 index 13a0e75d1d..0000000000 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.mqtt; - -import java.io.File; -import java.io.IOException; -import java.security.ProtectionDomain; -import java.util.LinkedList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.AutoFailTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.util.ByteSequence; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import static org.junit.Assert.assertArrayEquals; - -public abstract class AbstractMQTTTest extends AutoFailTestSupport { - protected TransportConnector mqttConnector; - protected TransportConnector openwireConnector; - - public static final int AT_MOST_ONCE =0; - public static final int AT_LEAST_ONCE = 1; - public static final int EXACTLY_ONCE =2; - - public File basedir() throws IOException { - ProtectionDomain protectionDomain = getClass().getProtectionDomain(); - return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile(); - } - - protected BrokerService brokerService; - protected LinkedList exceptions = new LinkedList(); - protected int numberOfMessages; - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - exceptions.clear(); - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.setAdvisorySupport(false); - brokerService.setUseJmx(false); - this.numberOfMessages = 1000; - } - - @Override - @After - public void tearDown() throws Exception { - if (brokerService != null) { - brokerService.stop(); - } - super.tearDown(); - } - - protected String getProtocolScheme() { - return "mqtt"; - } - - protected void addMQTTConnector() throws Exception { - addMQTTConnector(""); - } - - protected void addMQTTConnector(String config) throws Exception { - mqttConnector = brokerService.addConnector(getProtocolScheme()+"://localhost:0?" + config); - } - - protected void addOpenwireConnector() throws Exception { - openwireConnector = brokerService.addConnector("tcp://localhost:0"); - } - - protected void initializeConnection(MQTTClientProvider provider) throws Exception { - provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort()); - } - - protected static interface Task { - public void run() throws Exception; - } - - protected void within(int time, TimeUnit unit, Task task) throws InterruptedException { - long timeMS = unit.toMillis(time); - long deadline = System.currentTimeMillis() + timeMS; - while (true) { - try { - task.run(); - return; - } catch (Throwable e) { - long remaining = deadline - System.currentTimeMillis(); - if( remaining <=0 ) { - if( e instanceof RuntimeException ) { - throw (RuntimeException)e; - } - if( e instanceof Error ) { - throw (Error)e; - } - throw new RuntimeException(e); - } - Thread.sleep(Math.min(timeMS/10, remaining)); - } - } - } - -} \ No newline at end of file diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java new file mode 100644 index 0000000000..bbe4d1ab8c --- /dev/null +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.filter.DestinationMapEntry; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.AuthorizationEntry; +import org.apache.activemq.security.AuthorizationPlugin; +import org.apache.activemq.security.DefaultAuthorizationMap; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.apache.activemq.security.TempDestinationAuthorizationEntry; + +/** + * Used as a base class for MQTT tests that require Authentication and Authorization + * to be configured on the Broker. + */ +public class MQTTAuthTestSupport extends MQTTTestSupport { + + @Override + protected BrokerPlugin configureAuthentication() throws Exception { + List users = new ArrayList(); + users.add(new AuthenticationUser("admin", "admin", "users,admins")); + users.add(new AuthenticationUser("user", "password", "users")); + users.add(new AuthenticationUser("guest", "password", "guests")); + SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users); + authenticationPlugin.setAnonymousAccessAllowed(true); + + return authenticationPlugin; + } + + @Override + protected BrokerPlugin configureAuthorization() throws Exception { + + @SuppressWarnings("rawtypes") + List authorizationEntries = new ArrayList(); + + AuthorizationEntry entry = new AuthorizationEntry(); + entry.setQueue(">"); + entry.setRead("admins"); + entry.setWrite("admins"); + entry.setAdmin("admins"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setQueue("USERS.>"); + entry.setRead("users"); + entry.setWrite("users"); + entry.setAdmin("users"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setQueue("GUEST.>"); + entry.setRead("guests"); + entry.setWrite("guests,users"); + entry.setAdmin("guests,users"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic(">"); + entry.setRead("admins"); + entry.setWrite("admins"); + entry.setAdmin("admins"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic("USERS.>"); + entry.setRead("users"); + entry.setWrite("users"); + entry.setAdmin("users"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic("GUEST.>"); + entry.setRead("guests"); + entry.setWrite("guests,users"); + entry.setAdmin("guests,users"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic("anonymous"); + entry.setRead("guests,anonymous"); + entry.setWrite("guests,users,anonymous"); + entry.setAdmin("guests,users,anonymous"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic("ActiveMQ.Advisory.>"); + entry.setRead("guests,users,anonymous"); + entry.setWrite("guests,users,anonymous"); + entry.setAdmin("guests,users,anonymous"); + authorizationEntries.add(entry); + + TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry(); + tempEntry.setRead("admins"); + tempEntry.setWrite("admins"); + tempEntry.setAdmin("admins"); + + DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries); + authorizationMap.setTempDestinationAuthorizationEntry(tempEntry); + AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap); + + return authorizationPlugin; + } +} diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java new file mode 100644 index 0000000000..8c832b7042 --- /dev/null +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.net.ProtocolException; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.fusesource.mqtt.client.Tracer; +import org.fusesource.mqtt.codec.CONNACK; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests various use cases that require authentication or authorization over MQTT + */ +@RunWith(Parameterized.class) +public class MQTTAuthTests extends MQTTAuthTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(MQTTAuthTests.class); + + @Parameters(name= "{index}: scheme({0})") + public static Collection data() { + return Arrays.asList(new Object[][] { + {"mqtt", false}, + {"mqtt+ssl", true}, + {"mqtt+nio", false} + // TODO - Fails {"mqtt+nio+ssl", true} + }); + } + + @Test(timeout = 60 * 1000) + public void testAnonymousUserConnect() throws Exception { + MQTT mqtt = createMQTTConnection(); + mqtt.setCleanSession(true); + mqtt.setUserName((String)null); + mqtt.setPassword((String)null); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + LOG.info("Connected as anonymous client"); + connection.disconnect(); + } + + @Test(timeout = 60 * 1000) + public void testBadUserNameOrPasswordGetsConnAckWithErrorCode() throws Exception { + MQTT mqttPub = createMQTTConnection("pub", true); + mqttPub.setUserName("foo"); + mqttPub.setPassword("bar"); + + final AtomicBoolean failed = new AtomicBoolean(); + + mqttPub.setTracer(new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + LOG.info("Client received: {}", frame); + if (frame.messageType() == CONNACK.TYPE) { + CONNACK connAck = new CONNACK(); + try { + connAck.decode(frame); + LOG.info("{}", connAck); + assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, connAck.code()); + } catch (ProtocolException e) { + failed.set(true); + fail("Error decoding publish " + e.getMessage()); + } catch (Throwable err) { + failed.set(true); + throw err; + } + } + } + + @Override + public void onSend(MQTTFrame frame) { + LOG.info("Client sent: {}", frame); + } + }); + + BlockingConnection connectionPub = mqttPub.blockingConnection(); + try { + connectionPub.connect(); + fail("Should not be able to connect."); + } catch (Exception e) { + } + + assertFalse("connection should have failed.", failed.get()); + } + + @Test(timeout = 60 * 1000) + public void testFailedSubscription() throws Exception { + final String ANONYMOUS = "anonymous"; + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("foo"); + mqtt.setKeepAlive((short) 2); + + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + final String NAMED = "named"; + byte[] qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE) }); + assertEquals((byte) 0x80, qos[0]); + assertEquals((byte) QoS.EXACTLY_ONCE.ordinal(), qos[1]); + + // validate the subscription by sending a retained message + connection.publish(ANONYMOUS, ANONYMOUS.getBytes(), QoS.AT_MOST_ONCE, true); + Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(ANONYMOUS, new String(msg.getPayload())); + msg.ack(); + + connection.unsubscribe(new String[] { ANONYMOUS }); + qos = connection.subscribe(new Topic[] { new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE) }); + assertEquals((byte) QoS.AT_LEAST_ONCE.ordinal(), qos[0]); + + msg = connection.receive(1000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(ANONYMOUS, new String(msg.getPayload())); + msg.ack(); + + connection.disconnect(); + } + + @Test(timeout = 60 * 1000) + public void testWildcardRetainedSubscription() throws Exception { + MQTT mqttPub = createMQTTConnection("pub", true); + mqttPub.setUserName("admin"); + mqttPub.setPassword("admin"); + + BlockingConnection connectionPub = mqttPub.blockingConnection(); + connectionPub.connect(); + connectionPub.publish("one", "test".getBytes(), QoS.AT_LEAST_ONCE, true); + + MQTT mqttSub = createMQTTConnection("sub", true); + mqttSub.setUserName("user"); + mqttSub.setPassword("password"); + BlockingConnection connectionSub = mqttSub.blockingConnection(); + connectionSub.connect(); + connectionSub.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)}); + Message msg = connectionSub.receive(1, TimeUnit.SECONDS); + assertNull("Shouldn't receive the message", msg); + } +} diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java deleted file mode 100644 index 7104e41aae..0000000000 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.mqtt; - -import java.util.LinkedList; - -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.filter.DestinationMapEntry; -import org.apache.activemq.security.AuthenticationUser; -import org.apache.activemq.security.AuthorizationEntry; -import org.apache.activemq.security.AuthorizationPlugin; -import org.apache.activemq.security.DefaultAuthorizationMap; -import org.apache.activemq.security.SimpleAuthenticationPlugin; -import org.apache.activemq.util.Wait; -import org.fusesource.mqtt.client.BlockingConnection; -import org.fusesource.mqtt.client.MQTT; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.junit.runner.RunWith; -import org.junit.runners.BlockJUnit4ClassRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@RunWith(BlockJUnit4ClassRunner.class) -public class MQTTNioTest extends MQTTTest { - protected static final Logger LOG = LoggerFactory.getLogger(MQTTNioTest.class); - - @Rule - public TestName testname = new TestName(); - - @Before - public void setUp() throws Exception { - super.setUp(); - LOG.debug("Starting {}", testname.getMethodName()); - } - - @Override - protected String getProtocolScheme() { - return "mqtt+nio"; - } - - @Test(timeout = 60 * 1000) - public void testPingOnMQTTNIO() throws Exception { - addMQTTConnector("maxInactivityDuration=-1"); - brokerService.start(); - MQTT mqtt = createMQTTConnection(); - mqtt.setClientId("test-mqtt"); - mqtt.setKeepAlive((short)2); - final BlockingConnection connection = mqtt.blockingConnection(); - connection.connect(); - assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return connection.isConnected(); - } - })); - - connection.disconnect(); - } - - @Test(timeout = 60 * 1000) - public void testAnonymousUserConnect() throws Exception { - addMQTTConnector(); - configureAuthentication(brokerService); - brokerService.start(); - brokerService.waitUntilStarted(); - MQTT mqtt = createMQTTConnection(); - mqtt.setCleanSession(true); - mqtt.setUserName((String)null); - mqtt.setPassword((String)null); - final BlockingConnection connection = mqtt.blockingConnection(); - connection.connect(); - - System.out.println("Connected!"); - - connection.disconnect(); - - } - - private void configureAuthentication(BrokerService brokerService) throws Exception { - LinkedList users = new LinkedList(); - users.add(new AuthenticationUser("user1", "user1", "anonymous,user1group")); - final SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users); - - DefaultAuthorizationMap map = new DefaultAuthorizationMap(); - LinkedList authz = new LinkedList(); - AuthorizationEntry entry = new AuthorizationEntry(); - entry.setDestination(new ActiveMQTopic(">")); - entry.setAdmin("admins"); - entry.setRead("admins,anonymous"); - entry.setWrite("admins"); - authz.add(entry); - map.setAuthorizationEntries(authz); - AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(map); - authenticationPlugin.setAnonymousAccessAllowed(true); - - brokerService.setPlugins(new BrokerPlugin[]{ - authenticationPlugin, authorizationPlugin - }); - } - -} diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java deleted file mode 100644 index 1eb4ff530b..0000000000 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.mqtt; - -import java.security.SecureRandom; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import javax.net.ssl.KeyManager; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; - -import org.fusesource.mqtt.client.MQTT; -import org.junit.runner.RunWith; -import org.junit.runners.BlockJUnit4ClassRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@RunWith(BlockJUnit4ClassRunner.class) -public class MQTTSSLTest extends MQTTTest { - - private static final Logger LOG = LoggerFactory.getLogger(MQTTSSLTest.class); - - public void setUp() throws Exception { - String basedir = basedir().getPath(); - System.setProperty("javax.net.ssl.trustStore", basedir+"/src/test/resources/client.keystore"); - System.setProperty("javax.net.ssl.trustStorePassword", "password"); - System.setProperty("javax.net.ssl.trustStoreType", "jks"); - System.setProperty("javax.net.ssl.keyStore", basedir+"/src/test/resources/server.keystore"); - System.setProperty("javax.net.ssl.keyStorePassword", "password"); - System.setProperty("javax.net.ssl.keyStoreType", "jks"); - super.setUp(); - } - - @Override - protected String getProtocolScheme() { - return "mqtt+ssl"; - } - - protected MQTT createMQTTConnection() throws Exception { - MQTT mqtt = new MQTT(); - mqtt.setConnectAttemptsMax(1); - mqtt.setReconnectAttemptsMax(0); - mqtt.setTracer(createTracer()); - mqtt.setHost("ssl://localhost:"+mqttConnector.getConnectUri().getPort()); - SSLContext ctx = SSLContext.getInstance("TLS"); - ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); - mqtt.setSslContext(ctx); - return mqtt; - } - - protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception { - MQTT mqtt = createMQTTConnection(); - if (clientId != null) { - mqtt.setClientId(clientId); - } - mqtt.setCleanSession(clean); - return mqtt; - } - - protected void initializeConnection(MQTTClientProvider provider) throws Exception { - SSLContext ctx = SSLContext.getInstance("TLS"); - ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); - provider.setSslContext(ctx); - provider.connect("ssl://localhost:"+mqttConnector.getConnectUri().getPort()); - } - - - - static class DefaultTrustManager implements X509TrustManager { - - public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { - } - - public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { - } - - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[0]; - } - } -} diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 7d5a87ecdb..ec7f1ccde8 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -16,10 +16,18 @@ */ package org.apache.activemq.transport.mqtt; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.net.ProtocolException; import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Random; @@ -27,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; @@ -35,26 +44,13 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertNotEquals; import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.filter.DestinationMapEntry; -import org.apache.activemq.jaas.GroupPrincipal; -import org.apache.activemq.security.AuthenticationUser; -import org.apache.activemq.security.AuthorizationEntry; -import org.apache.activemq.security.AuthorizationPlugin; -import org.apache.activemq.security.DefaultAuthorizationMap; -import org.apache.activemq.security.SimpleAuthenticationPlugin; -import org.apache.activemq.security.SimpleAuthorizationMap; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; @@ -66,28 +62,42 @@ import org.fusesource.mqtt.client.Tracer; import org.fusesource.mqtt.codec.MQTTFrame; import org.fusesource.mqtt.codec.PUBLISH; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MQTTTest extends AbstractMQTTTest { +@RunWith(Parameterized.class) +public class MQTTTest extends MQTTTestSupport { private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class); + private static final int NUM_MESSAGES = 250; + + @Parameters(name= "{index}: scheme({0})") + public static Collection data() { + return Arrays.asList(new Object[][] { + {"mqtt", false}, + {"mqtt+ssl", true}, + {"mqtt+nio", false} + // TODO - Fails {"mqtt+nio+ssl", true} + }); + } + @Test(timeout = 60 * 1000) public void testSendAndReceiveMQTT() throws Exception { - addMQTTConnector(); - brokerService.start(); final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); initializeConnection(subscriptionProvider); subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE); - final CountDownLatch latch = new CountDownLatch(numberOfMessages); + final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES); Thread thread = new Thread(new Runnable() { @Override public void run() { - for (int i = 0; i < numberOfMessages; i++) { + for (int i = 0; i < NUM_MESSAGES; i++) { try { byte[] payload = subscriptionProvider.receive(10000); assertNotNull("Should get a message", payload); @@ -105,7 +115,7 @@ public class MQTTTest extends AbstractMQTTTest { final MQTTClientProvider publishProvider = getMQTTClientProvider(); initializeConnection(publishProvider); - for (int i = 0; i < numberOfMessages; i++) { + for (int i = 0; i < NUM_MESSAGES; i++) { String payload = "Message " + i; publishProvider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE); } @@ -118,8 +128,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testUnsubscribeMQTT() throws Exception { - addMQTTConnector(); - brokerService.start(); final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); initializeConnection(subscriptionProvider); @@ -127,12 +135,12 @@ public class MQTTTest extends AbstractMQTTTest { subscriptionProvider.subscribe(topic, AT_MOST_ONCE); - final CountDownLatch latch = new CountDownLatch(numberOfMessages / 2); + final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES / 2); Thread thread = new Thread(new Runnable() { @Override public void run() { - for (int i = 0; i < numberOfMessages; i++) { + for (int i = 0; i < NUM_MESSAGES; i++) { try { byte[] payload = subscriptionProvider.receive(10000); assertNotNull("Should get a message", payload); @@ -150,9 +158,9 @@ public class MQTTTest extends AbstractMQTTTest { final MQTTClientProvider publishProvider = getMQTTClientProvider(); initializeConnection(publishProvider); - for (int i = 0; i < numberOfMessages; i++) { + for (int i = 0; i < NUM_MESSAGES; i++) { String payload = "Message " + i; - if (i == numberOfMessages / 2) { + if (i == NUM_MESSAGES / 2) { subscriptionProvider.unsubscribe(topic); } publishProvider.publish(topic, payload.getBytes(), AT_LEAST_ONCE); @@ -171,13 +179,10 @@ public class MQTTTest extends AbstractMQTTTest { * with AT_MOST_ONCE - in MQTT the QoS is always determined by the * message as published - not the wish of the subscriber */ - addMQTTConnector(); - brokerService.start(); - final MQTTClientProvider provider = getMQTTClientProvider(); initializeConnection(provider); provider.subscribe("foo", EXACTLY_ONCE); - for (int i = 0; i < numberOfMessages; i++) { + for (int i = 0; i < NUM_MESSAGES; i++) { String payload = "Test Message: " + i; provider.publish("foo", payload.getBytes(), AT_MOST_ONCE); byte[] message = provider.receive(5000); @@ -189,13 +194,10 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 2 * 60 * 1000) public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception { - addMQTTConnector(); - brokerService.start(); - final MQTTClientProvider provider = getMQTTClientProvider(); initializeConnection(provider); provider.subscribe("foo", EXACTLY_ONCE); - for (int i = 0; i < numberOfMessages; i++) { + for (int i = 0; i < NUM_MESSAGES; i++) { String payload = "Test Message: " + i; provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE); byte[] message = provider.receive(5000); @@ -207,13 +209,10 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 2 * 60 * 1000) public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception { - addMQTTConnector(); - brokerService.start(); - final MQTTClientProvider provider = getMQTTClientProvider(); initializeConnection(provider); provider.subscribe("foo", AT_MOST_ONCE); - for (int i = 0; i < numberOfMessages; i++) { + for (int i = 0; i < NUM_MESSAGES; i++) { String payload = "Test Message: " + i; provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE); byte[] message = provider.receive(5000); @@ -225,13 +224,10 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testSendAndReceiveAtMostOnce() throws Exception { - addMQTTConnector(); - brokerService.start(); - final MQTTClientProvider provider = getMQTTClientProvider(); initializeConnection(provider); provider.subscribe("foo", AT_MOST_ONCE); - for (int i = 0; i < numberOfMessages; i++) { + for (int i = 0; i < NUM_MESSAGES; i++) { String payload = "Test Message: " + i; provider.publish("foo", payload.getBytes(), AT_MOST_ONCE); byte[] message = provider.receive(5000); @@ -243,13 +239,10 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 2 * 60 * 1000) public void testSendAndReceiveAtLeastOnce() throws Exception { - addMQTTConnector(); - brokerService.start(); - final MQTTClientProvider provider = getMQTTClientProvider(); initializeConnection(provider); provider.subscribe("foo", AT_LEAST_ONCE); - for (int i = 0; i < numberOfMessages; i++) { + for (int i = 0; i < NUM_MESSAGES; i++) { String payload = "Test Message: " + i; provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE); byte[] message = provider.receive(5000); @@ -261,8 +254,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testSendAndReceiveExactlyOnce() throws Exception { - addMQTTConnector(); - brokerService.start(); final MQTTClientProvider publisher = getMQTTClientProvider(); initializeConnection(publisher); @@ -270,7 +261,7 @@ public class MQTTTest extends AbstractMQTTTest { initializeConnection(subscriber); subscriber.subscribe("foo", EXACTLY_ONCE); - for (int i = 0; i < numberOfMessages; i++) { + for (int i = 0; i < NUM_MESSAGES; i++) { String payload = "Test Message: " + i; publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE); byte[] message = subscriber.receive(5000); @@ -287,9 +278,6 @@ public class MQTTTest extends AbstractMQTTTest { for (int i = 0; i < payload.length; i++) { payload[i] = '2'; } - addMQTTConnector(); - brokerService.start(); - final MQTTClientProvider publisher = getMQTTClientProvider(); initializeConnection(publisher); @@ -310,10 +298,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testSendAndReceiveRetainedMessages() throws Exception { - - addMQTTConnector(); - brokerService.start(); - final MQTTClientProvider publisher = getMQTTClientProvider(); initializeConnection(publisher); @@ -348,9 +332,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 30 * 1000) public void testValidZeroLengthClientId() throws Exception { - addMQTTConnector(); - brokerService.start(); - MQTT mqtt = createMQTTConnection(); mqtt.setClientId(""); mqtt.setCleanSession(true); @@ -362,9 +343,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 2 * 60 * 1000) public void testMQTTPathPatterns() throws Exception { - addMQTTConnector(); - brokerService.start(); - MQTT mqtt = createMQTTConnection(); mqtt.setClientId(""); mqtt.setCleanSession(true); @@ -434,9 +412,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testMQTTRetainQoS() throws Exception { - addMQTTConnector(); - brokerService.start(); - String[] topics = { "AT_MOST_ONCE", "AT_LEAST_ONCE", "EXACTLY_ONCE" }; for (int i = 0; i < topics.length; i++) { final String topic = topics[i]; @@ -480,9 +455,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testDuplicateSubscriptions() throws Exception { - addMQTTConnector(); - brokerService.start(); - MQTT mqtt = createMQTTConnection(); mqtt.setClientId("foo"); mqtt.setKeepAlive((short) 2); @@ -528,9 +500,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 120 * 1000) public void testRetainedMessage() throws Exception { - addMQTTConnector(); - brokerService.start(); - MQTT mqtt = createMQTTConnection(); mqtt.setKeepAlive((short) 2); @@ -597,59 +566,8 @@ public class MQTTTest extends AbstractMQTTTest { } } - @Test(timeout = 60 * 1000) - public void testFailedSubscription() throws Exception { - addMQTTConnector(); - - final SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(); - authenticationPlugin.setAnonymousAccessAllowed(true); - - final String ANONYMOUS = "anonymous"; - authenticationPlugin.setAnonymousGroup(ANONYMOUS); - final DefaultAuthorizationMap map = new DefaultAuthorizationMap(); - // only one authorized destination, anonymous for anonymous group! - map.put(new ActiveMQTopic(ANONYMOUS), new GroupPrincipal(ANONYMOUS)); - final AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(new SimpleAuthorizationMap(map, map, map)); - - brokerService.setPlugins(new BrokerPlugin[] { authorizationPlugin, authenticationPlugin }); - brokerService.start(); - - MQTT mqtt = createMQTTConnection(); - mqtt.setClientId("foo"); - mqtt.setKeepAlive((short) 2); - - final BlockingConnection connection = mqtt.blockingConnection(); - connection.connect(); - - final String NAMED = "named"; - byte[] qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE) }); - assertEquals((byte) 0x80, qos[0]); - assertEquals((byte) QoS.EXACTLY_ONCE.ordinal(), qos[1]); - - // validate the subscription by sending a retained message - connection.publish(ANONYMOUS, ANONYMOUS.getBytes(), QoS.AT_MOST_ONCE, true); - Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); - assertNotNull(msg); - assertEquals(ANONYMOUS, new String(msg.getPayload())); - msg.ack(); - - connection.unsubscribe(new String[] { ANONYMOUS }); - qos = connection.subscribe(new Topic[] { new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE) }); - assertEquals((byte) QoS.AT_LEAST_ONCE.ordinal(), qos[0]); - - msg = connection.receive(1000, TimeUnit.MILLISECONDS); - assertNotNull(msg); - assertEquals(ANONYMOUS, new String(msg.getPayload())); - msg.ack(); - - connection.disconnect(); - } - @Test(timeout = 60 * 1000) public void testUniqueMessageIds() throws Exception { - addMQTTConnector(); - brokerService.start(); - MQTT mqtt = createMQTTConnection(); mqtt.setClientId("foo"); mqtt.setKeepAlive((short) 2); @@ -737,9 +655,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testResendMessageId() throws Exception { - addMQTTConnector("trace=true"); - brokerService.start(); - final MQTT mqtt = createMQTTConnection("resend", false); mqtt.setKeepAlive((short) 5); @@ -806,9 +721,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 90 * 1000) public void testPacketIdGeneratorNonCleanSession() throws Exception { - addMQTTConnector("trace=true"); - brokerService.start(); - final MQTT mqtt = createMQTTConnection("nonclean-packetid", false); mqtt.setKeepAlive((short) 15); @@ -882,9 +794,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 90 * 1000) public void testPacketIdGeneratorCleanSession() throws Exception { - addMQTTConnector("trace=true"); - brokerService.start(); - final String[] cleanClientIds = new String[] { "", "clean-packetid", null }; final Map publishMap = new ConcurrentHashMap(); MQTT[] mqtts = new MQTT[cleanClientIds.length]; @@ -944,9 +853,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testClientConnectionFailure() throws Exception { - addMQTTConnector(); - brokerService.start(); - MQTT mqtt = createMQTTConnection("reconnect", false); final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); @@ -983,9 +889,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testCleanSession() throws Exception { - addMQTTConnector(); - brokerService.start(); - final String CLIENTID = "cleansession"; final MQTT mqttNotClean = createMQTTConnection(CLIENTID, false); BlockingConnection notClean = mqttNotClean.blockingConnection(); @@ -1025,10 +928,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testSendMQTTReceiveJMS() throws Exception { - addMQTTConnector(); - TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); - brokerService.start(); - final MQTTClientProvider provider = getMQTTClientProvider(); initializeConnection(provider); final String DESTINATION_NAME = "foo.*"; @@ -1037,7 +936,7 @@ public class MQTTTest extends AbstractMQTTTest { final String RETAINED = "RETAINED"; provider.publish("foo/bah", RETAINED.getBytes(), AT_LEAST_ONCE, true); - ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection(); + ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); // MUST set to true to receive retained messages activeMQConnection.setUseRetroactiveConsumer(true); activeMQConnection.start(); @@ -1052,7 +951,7 @@ public class MQTTTest extends AbstractMQTTTest { assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length)); assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)); - for (int i = 0; i < numberOfMessages; i++) { + for (int i = 0; i < NUM_MESSAGES; i++) { String payload = "Test Message: " + i; provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE); message = (ActiveMQMessage) consumer.receive(5000); @@ -1067,13 +966,10 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 2 * 60 * 1000) public void testSendJMSReceiveMQTT() throws Exception { - addMQTTConnector(); - TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); - brokerService.start(); final MQTTClientProvider provider = getMQTTClientProvider(); initializeConnection(provider); - ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection(); + ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); activeMQConnection.setUseRetroactiveConsumer(true); activeMQConnection.start(); Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -1094,7 +990,7 @@ public class MQTTTest extends AbstractMQTTTest { assertNotNull("Should get retained message", message); assertEquals(RETAINED, new String(message)); - for (int i = 0; i < numberOfMessages; i++) { + for (int i = 0; i < NUM_MESSAGES; i++) { String payload = "This is Test Message: " + i; sendMessage = s.createTextMessage(payload); producer.send(sendMessage); @@ -1109,8 +1005,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testPingKeepsInactivityMonitorAlive() throws Exception { - addMQTTConnector(); - brokerService.start(); MQTT mqtt = createMQTTConnection(); mqtt.setClientId("foo"); mqtt.setKeepAlive((short) 2); @@ -1130,8 +1024,10 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testTurnOffInactivityMonitor() throws Exception { - addMQTTConnector("transport.useInactivityMonitor=false"); - brokerService.start(); + stopBroker(); + protocolConfig = "transport.useInactivityMonitor=false"; + startBroker(); + MQTT mqtt = createMQTTConnection(); mqtt.setClientId("foo3"); mqtt.setKeepAlive((short) 2); @@ -1151,13 +1047,8 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 30 * 10000) public void testJmsMapping() throws Exception { - addMQTTConnector(); - addOpenwireConnector(); - brokerService.start(); - // start up jms consumer - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:" + openwireConnector.getConnectUri().getPort()); - Connection jmsConn = factory.createConnection(); + Connection jmsConn = cf.createConnection(); Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination dest = session.createTopic("test.foo"); MessageConsumer consumer = session.createConsumer(dest); @@ -1204,8 +1095,6 @@ public class MQTTTest extends AbstractMQTTTest { payload[i] = '2'; } - addMQTTConnector(); - brokerService.start(); MQTT mqtt = createMQTTConnection(); mqtt.setClientId("MQTT-Client"); mqtt.setCleanSession(false); @@ -1245,10 +1134,7 @@ public class MQTTTest extends AbstractMQTTTest { int numberOfRuns = 100; int messagesPerRun = 2; - addMQTTConnector("trace=true"); - brokerService.start(); final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true); - final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false); final BlockingConnection connectionPub = mqttPub.blockingConnection(); @@ -1298,9 +1184,10 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 30 * 1000) public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception { - // default keep alive in milliseconds - addMQTTConnector("transport.defaultKeepAlive=2000"); - brokerService.start(); + stopBroker(); + protocolConfig = "transport.defaultKeepAlive=2000"; + startBroker(); + MQTT mqtt = createMQTTConnection(); mqtt.setClientId("foo"); mqtt.setKeepAlive((short) 0); @@ -1318,9 +1205,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testReuseConnection() throws Exception { - addMQTTConnector(); - brokerService.start(); - MQTT mqtt = createMQTTConnection(); mqtt.setClientId("Test-Client"); @@ -1340,9 +1224,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testNoMessageReceivedAfterUnsubscribeMQTT() throws Exception { - addMQTTConnector(); - brokerService.setPersistent(true); - brokerService.start(); Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) }; MQTT mqttPub = createMQTTConnection("MQTTPub-Client", true); @@ -1395,8 +1276,6 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout = 60 * 1000) public void testMQTT311Connection() throws Exception { - addMQTTConnector(); - brokerService.start(); MQTT mqtt = createMQTTConnection(); mqtt.setClientId("foo"); mqtt.setVersion("3.1.1"); @@ -1405,64 +1284,8 @@ public class MQTTTest extends AbstractMQTTTest { connection.disconnect(); } - @Test(timeout = 60 * 1000) - public void testWildcardRetainedSubscription() throws Exception { - addMQTTConnector(); - - LinkedList users = new LinkedList(); - users.add(new AuthenticationUser("user", "user", "users")); - users.add(new AuthenticationUser("admin", "admin", "admins")); - final SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users); - - - DefaultAuthorizationMap map = new DefaultAuthorizationMap(); - LinkedList authz = new LinkedList(); - - AuthorizationEntry entryOne = new AuthorizationEntry(); - entryOne.setDestination(new ActiveMQTopic("one")); - entryOne.setAdmin("admins"); - entryOne.setRead("admins"); - entryOne.setWrite("admins"); - authz.add(entryOne); - - AuthorizationEntry entryTwo = new AuthorizationEntry(); - entryTwo.setDestination(new ActiveMQTopic("two")); - entryTwo.setAdmin("users"); - entryTwo.setRead("users"); - entryTwo.setWrite("users"); - authz.add(entryTwo); - - map.setAuthorizationEntries(authz); - AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(map); - - brokerService.setPlugins(new BrokerPlugin[] { authorizationPlugin, authenticationPlugin }); - - brokerService.start(); - - MQTT mqttPub = createMQTTConnection("pub", true); - mqttPub.setUserName("admin"); - mqttPub.setPassword("admin"); - - BlockingConnection connectionPub = mqttPub.blockingConnection(); - connectionPub.connect(); - connectionPub.publish("one", "test".getBytes(), QoS.AT_LEAST_ONCE, true); - - MQTT mqttSub = createMQTTConnection("sub", true); - mqttSub.setUserName("user"); - mqttSub.setPassword("user"); - BlockingConnection connectionSub = mqttSub.blockingConnection(); - connectionSub.connect(); - connectionSub.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)}); - Message msg = connectionSub.receive(1, TimeUnit.SECONDS); - assertNull("Shouldn't receive the message", msg); - } - @Test(timeout = 60 * 1000) public void testActiveMQRecoveryPolicy() throws Exception { - addMQTTConnector(); - - brokerService.start(); - // test with ActiveMQ LastImageSubscriptionRecoveryPolicy final PolicyMap policyMap = new PolicyMap(); final PolicyEntry policyEntry = new PolicyEntry(); @@ -1507,49 +1330,25 @@ public class MQTTTest extends AbstractMQTTTest { assertEquals("Should receive 2 non-retained messages", 2, nonretain[0]); } - @Override - protected String getProtocolScheme() { - return "mqtt"; - } + @Test(timeout = 60 * 1000) + public void testPingOnMQTT() throws Exception { + stopBroker(); + protocolConfig = "maxInactivityDuration=-1"; + startBroker(); - protected MQTTClientProvider getMQTTClientProvider() { - return new FuseMQQTTClientProvider(); - } - - protected MQTT createMQTTConnection() throws Exception { - return createMQTTConnection(null, false); - } - - protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception { - MQTT mqtt = new MQTT(); - mqtt.setConnectAttemptsMax(1); - mqtt.setReconnectAttemptsMax(0); - mqtt.setTracer(createTracer()); - if (clientId != null) { - mqtt.setClientId(clientId); - } - mqtt.setCleanSession(clean); - mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort()); - // shut off connect retry - return mqtt; - } - - protected Tracer createTracer() { - return new Tracer() { - @Override - public void onReceive(MQTTFrame frame) { - LOG.info("Client Received:\n" + frame); - } + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("test-mqtt"); + mqtt.setKeepAlive((short)2); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() { @Override - public void onSend(MQTTFrame frame) { - LOG.info("Client Sent:\n" + frame); + public boolean isSatisified() throws Exception { + return connection.isConnected(); } + })); - @Override - public void debug(String message, Object... args) { - LOG.info(String.format(message, args)); - } - }; + connection.disconnect(); } } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java index 7fad9462b4..4571a36e02 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java @@ -20,14 +20,20 @@ package org.apache.activemq.transport.mqtt; import java.io.File; import java.io.IOException; import java.security.ProtectionDomain; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.LinkedList; -import java.util.List; import java.util.concurrent.TimeUnit; import javax.jms.JMSException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerPlugin; @@ -36,14 +42,6 @@ import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean; -import org.apache.activemq.filter.DestinationMapEntry; -import org.apache.activemq.security.AuthenticationUser; -import org.apache.activemq.security.AuthorizationEntry; -import org.apache.activemq.security.AuthorizationPlugin; -import org.apache.activemq.security.DefaultAuthorizationMap; -import org.apache.activemq.security.SimpleAuthenticationPlugin; -import org.apache.activemq.security.TempDestinationAuthorizationEntry; -import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; import org.apache.activemq.transport.mqtt.util.ResourceLoadingSslContext; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Tracer; @@ -52,6 +50,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.rules.TestName; +import org.junit.runners.Parameterized.Parameter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,13 +60,17 @@ public class MQTTTestSupport { protected BrokerService brokerService; protected int port; - protected int sslPort; - protected int nioPort; - protected int nioSslPort; protected String jmsUri = "vm://localhost"; protected ActiveMQConnectionFactory cf; protected LinkedList exceptions = new LinkedList(); - protected int numberOfMessages; + protected boolean persistent; + protected String protocolConfig; + + @Parameter(0) + public String protocolScheme; + + @Parameter(1) + public boolean useSSL; public static final int AT_MOST_ONCE = 0; public static final int AT_LEAST_ONCE = 1; @@ -80,18 +83,14 @@ public class MQTTTestSupport { return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile(); } - public static void main(String[] args) throws Exception { - final MQTTTestSupport s = new MQTTTestSupport(); + public MQTTTestSupport() { + this.protocolScheme = "mqtt"; + this.useSSL = false; + } - s.sslPort = 5675; - s.port = 5676; - s.nioPort = 5677; - s.nioSslPort = 5678; - - s.startBroker(); - while(true) { - Thread.sleep(100000); - } + public MQTTTestSupport(String connectorScheme, boolean useSsl) { + this.protocolScheme = connectorScheme; + this.useSSL = useSsl; } public String getName() { @@ -100,8 +99,16 @@ public class MQTTTestSupport { @Before public void setUp() throws Exception { + + String basedir = basedir().getPath(); + System.setProperty("javax.net.ssl.trustStore", basedir + "/src/test/resources/client.keystore"); + System.setProperty("javax.net.ssl.trustStorePassword", "password"); + System.setProperty("javax.net.ssl.trustStoreType", "jks"); + System.setProperty("javax.net.ssl.keyStore", basedir + "/src/test/resources/server.keystore"); + System.setProperty("javax.net.ssl.keyStorePassword", "password"); + System.setProperty("javax.net.ssl.keyStoreType", "jks"); + exceptions.clear(); - numberOfMessages = 1000; startBroker(); } @@ -162,84 +169,16 @@ public class MQTTTestSupport { brokerService = new BrokerService(); brokerService.setPersistent(isPersistent()); brokerService.setAdvisorySupport(false); - brokerService.setSchedulerSupport(true); + brokerService.setSchedulerSupport(isSchedulerSupportEnabled()); brokerService.setPopulateJMSXUserID(true); - brokerService.setSchedulerSupport(true); - - JobSchedulerStoreImpl jobStore = new JobSchedulerStoreImpl(); - jobStore.setDirectory(new File("activemq-data")); - - brokerService.setJobSchedulerStore(jobStore); } protected BrokerPlugin configureAuthentication() throws Exception { - List users = new ArrayList(); - users.add(new AuthenticationUser("system", "manager", "users,admins")); - users.add(new AuthenticationUser("user", "password", "users")); - users.add(new AuthenticationUser("guest", "password", "guests")); - SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users); - - return authenticationPlugin; + return null; } protected BrokerPlugin configureAuthorization() throws Exception { - - @SuppressWarnings("rawtypes") - List authorizationEntries = new ArrayList(); - - AuthorizationEntry entry = new AuthorizationEntry(); - entry.setQueue(">"); - entry.setRead("admins"); - entry.setWrite("admins"); - entry.setAdmin("admins"); - authorizationEntries.add(entry); - entry = new AuthorizationEntry(); - entry.setQueue("USERS.>"); - entry.setRead("users"); - entry.setWrite("users"); - entry.setAdmin("users"); - authorizationEntries.add(entry); - entry = new AuthorizationEntry(); - entry.setQueue("GUEST.>"); - entry.setRead("guests"); - entry.setWrite("guests,users"); - entry.setAdmin("guests,users"); - authorizationEntries.add(entry); - entry = new AuthorizationEntry(); - entry.setTopic(">"); - entry.setRead("admins"); - entry.setWrite("admins"); - entry.setAdmin("admins"); - authorizationEntries.add(entry); - entry = new AuthorizationEntry(); - entry.setTopic("USERS.>"); - entry.setRead("users"); - entry.setWrite("users"); - entry.setAdmin("users"); - authorizationEntries.add(entry); - entry = new AuthorizationEntry(); - entry.setTopic("GUEST.>"); - entry.setRead("guests"); - entry.setWrite("guests,users"); - entry.setAdmin("guests,users"); - authorizationEntries.add(entry); - entry = new AuthorizationEntry(); - entry.setTopic("ActiveMQ.Advisory.>"); - entry.setRead("guests,users"); - entry.setWrite("guests,users"); - entry.setAdmin("guests,users"); - authorizationEntries.add(entry); - - TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry(); - tempEntry.setRead("admins"); - tempEntry.setWrite("admins"); - tempEntry.setAdmin("admins"); - - DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries); - authorizationMap.setTempDestinationAuthorizationEntry(tempEntry); - AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap); - - return authorizationPlugin; + return null; } protected void applyBrokerPolicies() throws Exception { @@ -255,8 +194,16 @@ public class MQTTTestSupport { // Overrides of this method can add additional configuration options or add multiple // MQTT transport connectors as needed, the port variable is always supposed to be // assigned the primary MQTT connector's port. - TransportConnector connector = brokerService.addConnector(getProtocolScheme() + "://0.0.0.0:" + port); - port = connector.getConnectUri().getPort(); + + StringBuilder connectorURI = new StringBuilder(); + connectorURI.append(getProtocolScheme()); + connectorURI.append("://0.0.0.0:").append(port); + if (protocolConfig != null && !protocolConfig.isEmpty()) { + connectorURI.append("?").append(protocolConfig); + } + + port = brokerService.addConnector(connectorURI.toString()).getConnectUri().getPort(); + LOG.info("Added connector {} to broker", getProtocolScheme()); } public void stopBroker() throws Exception { @@ -299,7 +246,7 @@ public class MQTTTestSupport { /** * Initialize an MQTTClientProvider instance. By default this method uses the port that's - * assigned to be the TCP based port using the base version of addMQTTConnector. A sbuclass + * assigned to be the TCP based port using the base version of addMQTTConnector. A subclass * can either change the value of port or override this method to assign the correct port. * * @param provider @@ -308,14 +255,41 @@ public class MQTTTestSupport { * @throws Exception if an error occurs during initialization. */ protected void initializeConnection(MQTTClientProvider provider) throws Exception { - provider.connect("tcp://localhost:" + port); + if (!isUseSSL()) { + provider.connect("tcp://localhost:" + port); + } else { + SSLContext ctx = SSLContext.getInstance("TLS"); + ctx.init(new KeyManager[0], new TrustManager[] { new DefaultTrustManager() }, new SecureRandom()); + provider.setSslContext(ctx); + provider.connect("ssl://localhost:" + port); + } } - protected String getProtocolScheme() { - return "mqtt"; + public String getProtocolScheme() { + return protocolScheme; } - protected boolean isPersistent() { + public void setProtocolScheme(String scheme) { + this.protocolScheme = scheme; + } + + public boolean isUseSSL() { + return this.useSSL; + } + + public void setUseSSL(boolean useSSL) { + this.useSSL = useSSL; + } + + public boolean isPersistent() { + return persistent; + } + + public int getPort() { + return this.port; + } + + public boolean isSchedulerSupportEnabled() { return false; } @@ -355,6 +329,14 @@ public class MQTTTestSupport { } protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception { + if (isUseSSL()) { + return createMQTTSslConnection(clientId, clean); + } else { + return createMQTTTcpConnection(clientId, clean); + } + } + + private MQTT createMQTTTcpConnection(String clientId, boolean clean) throws Exception { MQTT mqtt = new MQTT(); mqtt.setConnectAttemptsMax(1); mqtt.setReconnectAttemptsMax(0); @@ -367,6 +349,23 @@ public class MQTTTestSupport { return mqtt; } + private MQTT createMQTTSslConnection(String clientId, boolean clean) throws Exception { + MQTT mqtt = new MQTT(); + mqtt.setConnectAttemptsMax(1); + mqtt.setReconnectAttemptsMax(0); + mqtt.setTracer(createTracer()); + mqtt.setHost("ssl://localhost:" + port); + if (clientId != null) { + mqtt.setClientId(clientId); + } + mqtt.setCleanSession(clean); + + SSLContext ctx = SSLContext.getInstance("TLS"); + ctx.init(new KeyManager[0], new TrustManager[] { new DefaultTrustManager() }, new SecureRandom()); + mqtt.setSslContext(ctx); + return mqtt; + } + protected Tracer createTracer() { return new Tracer() { @Override @@ -385,4 +384,20 @@ public class MQTTTestSupport { } }; } + + static class DefaultTrustManager implements X509TrustManager { + + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + } + + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTests.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTests.java deleted file mode 100644 index ba2e964a46..0000000000 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTests.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.mqtt; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.net.ProtocolException; - -import org.fusesource.mqtt.client.BlockingConnection; -import org.fusesource.mqtt.client.MQTT; -import org.fusesource.mqtt.client.Tracer; -import org.fusesource.mqtt.codec.CONNACK; -import org.fusesource.mqtt.codec.MQTTFrame; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MQTTTests extends MQTTTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(MQTTTests.class); - - @Test(timeout = 60 * 1000) - public void testBadUserNameOrPasswordGetsConnAckWithErrorCode() throws Exception { - MQTT mqttPub = createMQTTConnection("pub", true); - mqttPub.setUserName("admin"); - mqttPub.setPassword("admin"); - - mqttPub.setTracer(new Tracer() { - @Override - public void onReceive(MQTTFrame frame) { - LOG.info("Client received: {}", frame); - if (frame.messageType() == CONNACK.TYPE) { - CONNACK connAck = new CONNACK(); - try { - connAck.decode(frame); - LOG.info("{}", connAck); - assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, connAck.code()); - } catch (ProtocolException e) { - fail("Error decoding publish " + e.getMessage()); - } - } - } - - @Override - public void onSend(MQTTFrame frame) { - LOG.info("Client sent: {}", frame); - } - }); - - BlockingConnection connectionPub = mqttPub.blockingConnection(); - try { - connectionPub.connect(); - fail("Should not be able to connect."); - } catch (Exception e) {} - } -} diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java deleted file mode 100644 index 589f3bb1c5..0000000000 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.mqtt; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.TransportConnector; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PahoMQTNioTTest extends PahoMQTTTest { - - private static final Logger LOG = LoggerFactory.getLogger(PahoMQTNioTTest.class); - - @Override - protected String getProtocolScheme() { - return "mqtt+nio"; - } - - @Test(timeout = 300000) - public void testLotsOfClients() throws Exception { - - final int CLIENTS = Integer.getInteger("PahoMQTNioTTest.CLIENTS", 100); - LOG.info("Using: " + CLIENTS + " clients"); - addMQTTConnector(); - TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); - brokerService.start(); - - ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection(); - activeMQConnection.start(); - Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = s.createConsumer(s.createTopic("test")); - - final AtomicInteger receiveCounter = new AtomicInteger(); - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - receiveCounter.incrementAndGet(); - } - }); - - final AtomicReference asyncError = new AtomicReference(); - final CountDownLatch connectedDoneLatch = new CountDownLatch(CLIENTS); - final CountDownLatch disconnectDoneLatch = new CountDownLatch(CLIENTS); - final CountDownLatch sendBarrier = new CountDownLatch(1); - for (int i = 0; i < CLIENTS; i++) { - Thread.sleep(10); - new Thread(null, null, "client:" + i) { - @Override - public void run() { - try { - MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), Thread.currentThread().getName(), - new MemoryPersistence()); - client.connect(); - connectedDoneLatch.countDown(); - sendBarrier.await(); - for (int i = 0; i < 10; i++) { - Thread.sleep(1000); - client.publish("test", "hello".getBytes(), 1, false); - } - client.disconnect(); - client.close(); - } catch (Throwable e) { - e.printStackTrace(); - asyncError.set(e); - } finally { - disconnectDoneLatch.countDown(); - } - } - }.start(); - } - - connectedDoneLatch.await(); - assertNull("Async error: " + asyncError.get(), asyncError.get()); - sendBarrier.countDown(); - - LOG.info("All clients connected... waiting to receive sent messages..."); - - // We should eventually get all the messages. - within(30, TimeUnit.SECONDS, new Task() { - @Override - public void run() throws Exception { - assertTrue(receiveCounter.get() == CLIENTS * 10); - } - }); - - LOG.info("All messages received."); - - disconnectDoneLatch.await(); - assertNull("Async error: " + asyncError.get(), asyncError.get()); - } -} diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java index 3af4b53ee6..17305be82e 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java @@ -16,31 +16,125 @@ */ package org.apache.activemq.transport.mqtt; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.TransportConnector; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class PahoMQTTTest extends AbstractMQTTTest { +@RunWith(Parameterized.class) +public class PahoMQTTTest extends MQTTTestSupport { - @Test(timeout=300000) - public void testSendAndReceiveMQTT() throws Exception { - addMQTTConnector(); - TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); - brokerService.start(); + private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class); - ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection(); + @Parameters(name= "{index}: scheme({0})") + public static Collection data() { + return Arrays.asList(new Object[][] { + {"mqtt", false}, + {"mqtt+nio", false} + }); + } + + @Test(timeout = 300000) + public void testLotsOfClients() throws Exception { + + final int CLIENTS = Integer.getInteger("PahoMQTTTest.CLIENTS", 100); + LOG.info("Using: {} clients", CLIENTS); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); activeMQConnection.start(); Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = s.createConsumer(s.createTopic("test")); - MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), "clientid", new MemoryPersistence()); + final AtomicInteger receiveCounter = new AtomicInteger(); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + receiveCounter.incrementAndGet(); + } + }); + + final AtomicReference asyncError = new AtomicReference(); + final CountDownLatch connectedDoneLatch = new CountDownLatch(CLIENTS); + final CountDownLatch disconnectDoneLatch = new CountDownLatch(CLIENTS); + final CountDownLatch sendBarrier = new CountDownLatch(1); + for (int i = 0; i < CLIENTS; i++) { + Thread.sleep(10); + new Thread(null, null, "client:" + i) { + @Override + public void run() { + try { + MqttClient client = new MqttClient("tcp://localhost:" + getPort(), + Thread.currentThread().getName(), + new MemoryPersistence()); + client.connect(); + connectedDoneLatch.countDown(); + sendBarrier.await(); + for (int i = 0; i < 10; i++) { + Thread.sleep(1000); + client.publish("test", "hello".getBytes(), 1, false); + } + client.disconnect(); + client.close(); + } catch (Throwable e) { + e.printStackTrace(); + asyncError.set(e); + } finally { + disconnectDoneLatch.countDown(); + } + } + }.start(); + } + + connectedDoneLatch.await(); + assertNull("Async error: " + asyncError.get(), asyncError.get()); + sendBarrier.countDown(); + + LOG.info("All clients connected... waiting to receive sent messages..."); + + // We should eventually get all the messages. + within(30, TimeUnit.SECONDS, new Task() { + @Override + public void run() throws Exception { + assertTrue(receiveCounter.get() == CLIENTS * 10); + } + }); + + LOG.info("All messages received."); + + disconnectDoneLatch.await(); + assertNull("Async error: " + asyncError.get(), asyncError.get()); + } + + @Test(timeout=300000) + public void testSendAndReceiveMQTT() throws Exception { + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); + activeMQConnection.start(); + Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = s.createConsumer(s.createTopic("test")); + + MqttClient client = new MqttClient("tcp://localhost:" + getPort(), "clientid", new MemoryPersistence()); client.connect(); client.publish("test", "hello".getBytes(), 1, false);