Refactored MQTT test suite to use parameterized tests and ensure that

the various tests are run on the currently supported transport
connectors.
This commit is contained in:
Timothy Bish 2014-07-23 18:46:11 -04:00
parent 93f686c5cf
commit fb569e3fbc
10 changed files with 579 additions and 916 deletions

View File

@ -1,127 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.File;
import java.io.IOException;
import java.security.ProtectionDomain;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.ByteSequence;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
public abstract class AbstractMQTTTest extends AutoFailTestSupport {
protected TransportConnector mqttConnector;
protected TransportConnector openwireConnector;
public static final int AT_MOST_ONCE =0;
public static final int AT_LEAST_ONCE = 1;
public static final int EXACTLY_ONCE =2;
public File basedir() throws IOException {
ProtectionDomain protectionDomain = getClass().getProtectionDomain();
return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
}
protected BrokerService brokerService;
protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
protected int numberOfMessages;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
exceptions.clear();
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(false);
this.numberOfMessages = 1000;
}
@Override
@After
public void tearDown() throws Exception {
if (brokerService != null) {
brokerService.stop();
}
super.tearDown();
}
protected String getProtocolScheme() {
return "mqtt";
}
protected void addMQTTConnector() throws Exception {
addMQTTConnector("");
}
protected void addMQTTConnector(String config) throws Exception {
mqttConnector = brokerService.addConnector(getProtocolScheme()+"://localhost:0?" + config);
}
protected void addOpenwireConnector() throws Exception {
openwireConnector = brokerService.addConnector("tcp://localhost:0");
}
protected void initializeConnection(MQTTClientProvider provider) throws Exception {
provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort());
}
protected static interface Task {
public void run() throws Exception;
}
protected void within(int time, TimeUnit unit, Task task) throws InterruptedException {
long timeMS = unit.toMillis(time);
long deadline = System.currentTimeMillis() + timeMS;
while (true) {
try {
task.run();
return;
} catch (Throwable e) {
long remaining = deadline - System.currentTimeMillis();
if( remaining <=0 ) {
if( e instanceof RuntimeException ) {
throw (RuntimeException)e;
}
if( e instanceof Error ) {
throw (Error)e;
}
throw new RuntimeException(e);
}
Thread.sleep(Math.min(timeMS/10, remaining));
}
}
}
}

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.AuthorizationEntry;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.DefaultAuthorizationMap;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.security.TempDestinationAuthorizationEntry;
/**
* Used as a base class for MQTT tests that require Authentication and Authorization
* to be configured on the Broker.
*/
public class MQTTAuthTestSupport extends MQTTTestSupport {
@Override
protected BrokerPlugin configureAuthentication() throws Exception {
List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
users.add(new AuthenticationUser("admin", "admin", "users,admins"));
users.add(new AuthenticationUser("user", "password", "users"));
users.add(new AuthenticationUser("guest", "password", "guests"));
SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
authenticationPlugin.setAnonymousAccessAllowed(true);
return authenticationPlugin;
}
@Override
protected BrokerPlugin configureAuthorization() throws Exception {
@SuppressWarnings("rawtypes")
List<DestinationMapEntry> authorizationEntries = new ArrayList<DestinationMapEntry>();
AuthorizationEntry entry = new AuthorizationEntry();
entry.setQueue(">");
entry.setRead("admins");
entry.setWrite("admins");
entry.setAdmin("admins");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setQueue("USERS.>");
entry.setRead("users");
entry.setWrite("users");
entry.setAdmin("users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setQueue("GUEST.>");
entry.setRead("guests");
entry.setWrite("guests,users");
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic(">");
entry.setRead("admins");
entry.setWrite("admins");
entry.setAdmin("admins");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("USERS.>");
entry.setRead("users");
entry.setWrite("users");
entry.setAdmin("users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("GUEST.>");
entry.setRead("guests");
entry.setWrite("guests,users");
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("anonymous");
entry.setRead("guests,anonymous");
entry.setWrite("guests,users,anonymous");
entry.setAdmin("guests,users,anonymous");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("ActiveMQ.Advisory.>");
entry.setRead("guests,users,anonymous");
entry.setWrite("guests,users,anonymous");
entry.setAdmin("guests,users,anonymous");
authorizationEntries.add(entry);
TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry();
tempEntry.setRead("admins");
tempEntry.setWrite("admins");
tempEntry.setAdmin("admins");
DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries);
authorizationMap.setTempDestinationAuthorizationEntry(tempEntry);
AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap);
return authorizationPlugin;
}
}

View File

@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.net.ProtocolException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests various use cases that require authentication or authorization over MQTT
*/
@RunWith(Parameterized.class)
public class MQTTAuthTests extends MQTTAuthTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTAuthTests.class);
@Parameters(name= "{index}: scheme({0})")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{"mqtt", false},
{"mqtt+ssl", true},
{"mqtt+nio", false}
// TODO - Fails {"mqtt+nio+ssl", true}
});
}
@Test(timeout = 60 * 1000)
public void testAnonymousUserConnect() throws Exception {
MQTT mqtt = createMQTTConnection();
mqtt.setCleanSession(true);
mqtt.setUserName((String)null);
mqtt.setPassword((String)null);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
LOG.info("Connected as anonymous client");
connection.disconnect();
}
@Test(timeout = 60 * 1000)
public void testBadUserNameOrPasswordGetsConnAckWithErrorCode() throws Exception {
MQTT mqttPub = createMQTTConnection("pub", true);
mqttPub.setUserName("foo");
mqttPub.setPassword("bar");
final AtomicBoolean failed = new AtomicBoolean();
mqttPub.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client received: {}", frame);
if (frame.messageType() == CONNACK.TYPE) {
CONNACK connAck = new CONNACK();
try {
connAck.decode(frame);
LOG.info("{}", connAck);
assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, connAck.code());
} catch (ProtocolException e) {
failed.set(true);
fail("Error decoding publish " + e.getMessage());
} catch (Throwable err) {
failed.set(true);
throw err;
}
}
}
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client sent: {}", frame);
}
});
BlockingConnection connectionPub = mqttPub.blockingConnection();
try {
connectionPub.connect();
fail("Should not be able to connect.");
} catch (Exception e) {
}
assertFalse("connection should have failed.", failed.get());
}
@Test(timeout = 60 * 1000)
public void testFailedSubscription() throws Exception {
final String ANONYMOUS = "anonymous";
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setKeepAlive((short) 2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
final String NAMED = "named";
byte[] qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE) });
assertEquals((byte) 0x80, qos[0]);
assertEquals((byte) QoS.EXACTLY_ONCE.ordinal(), qos[1]);
// validate the subscription by sending a retained message
connection.publish(ANONYMOUS, ANONYMOUS.getBytes(), QoS.AT_MOST_ONCE, true);
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(ANONYMOUS, new String(msg.getPayload()));
msg.ack();
connection.unsubscribe(new String[] { ANONYMOUS });
qos = connection.subscribe(new Topic[] { new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE) });
assertEquals((byte) QoS.AT_LEAST_ONCE.ordinal(), qos[0]);
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(ANONYMOUS, new String(msg.getPayload()));
msg.ack();
connection.disconnect();
}
@Test(timeout = 60 * 1000)
public void testWildcardRetainedSubscription() throws Exception {
MQTT mqttPub = createMQTTConnection("pub", true);
mqttPub.setUserName("admin");
mqttPub.setPassword("admin");
BlockingConnection connectionPub = mqttPub.blockingConnection();
connectionPub.connect();
connectionPub.publish("one", "test".getBytes(), QoS.AT_LEAST_ONCE, true);
MQTT mqttSub = createMQTTConnection("sub", true);
mqttSub.setUserName("user");
mqttSub.setPassword("password");
BlockingConnection connectionSub = mqttSub.blockingConnection();
connectionSub.connect();
connectionSub.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)});
Message msg = connectionSub.receive(1, TimeUnit.SECONDS);
assertNull("Shouldn't receive the message", msg);
}
}

View File

@ -1,121 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.util.LinkedList;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.AuthorizationEntry;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.DefaultAuthorizationMap;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(BlockJUnit4ClassRunner.class)
public class MQTTNioTest extends MQTTTest {
protected static final Logger LOG = LoggerFactory.getLogger(MQTTNioTest.class);
@Rule
public TestName testname = new TestName();
@Before
public void setUp() throws Exception {
super.setUp();
LOG.debug("Starting {}", testname.getMethodName());
}
@Override
protected String getProtocolScheme() {
return "mqtt+nio";
}
@Test(timeout = 60 * 1000)
public void testPingOnMQTTNIO() throws Exception {
addMQTTConnector("maxInactivityDuration=-1");
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("test-mqtt");
mqtt.setKeepAlive((short)2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return connection.isConnected();
}
}));
connection.disconnect();
}
@Test(timeout = 60 * 1000)
public void testAnonymousUserConnect() throws Exception {
addMQTTConnector();
configureAuthentication(brokerService);
brokerService.start();
brokerService.waitUntilStarted();
MQTT mqtt = createMQTTConnection();
mqtt.setCleanSession(true);
mqtt.setUserName((String)null);
mqtt.setPassword((String)null);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
System.out.println("Connected!");
connection.disconnect();
}
private void configureAuthentication(BrokerService brokerService) throws Exception {
LinkedList<AuthenticationUser> users = new LinkedList<AuthenticationUser>();
users.add(new AuthenticationUser("user1", "user1", "anonymous,user1group"));
final SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
DefaultAuthorizationMap map = new DefaultAuthorizationMap();
LinkedList<DestinationMapEntry> authz = new LinkedList<DestinationMapEntry>();
AuthorizationEntry entry = new AuthorizationEntry();
entry.setDestination(new ActiveMQTopic(">"));
entry.setAdmin("admins");
entry.setRead("admins,anonymous");
entry.setWrite("admins");
authz.add(entry);
map.setAuthorizationEntries(authz);
AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(map);
authenticationPlugin.setAnonymousAccessAllowed(true);
brokerService.setPlugins(new BrokerPlugin[]{
authenticationPlugin, authorizationPlugin
});
}
}

View File

@ -1,96 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.fusesource.mqtt.client.MQTT;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(BlockJUnit4ClassRunner.class)
public class MQTTSSLTest extends MQTTTest {
private static final Logger LOG = LoggerFactory.getLogger(MQTTSSLTest.class);
public void setUp() throws Exception {
String basedir = basedir().getPath();
System.setProperty("javax.net.ssl.trustStore", basedir+"/src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", basedir+"/src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
super.setUp();
}
@Override
protected String getProtocolScheme() {
return "mqtt+ssl";
}
protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1);
mqtt.setReconnectAttemptsMax(0);
mqtt.setTracer(createTracer());
mqtt.setHost("ssl://localhost:"+mqttConnector.getConnectUri().getPort());
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
mqtt.setSslContext(ctx);
return mqtt;
}
protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception {
MQTT mqtt = createMQTTConnection();
if (clientId != null) {
mqtt.setClientId(clientId);
}
mqtt.setCleanSession(clean);
return mqtt;
}
protected void initializeConnection(MQTTClientProvider provider) throws Exception {
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
provider.setSslContext(ctx);
provider.connect("ssl://localhost:"+mqttConnector.getConnectUri().getPort());
}
static class DefaultTrustManager implements X509TrustManager {
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}
}

View File

@ -16,10 +16,18 @@
*/
package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -27,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
@ -35,26 +44,13 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotEquals;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.jaas.GroupPrincipal;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.AuthorizationEntry;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.DefaultAuthorizationMap;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.security.SimpleAuthorizationMap;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
@ -66,28 +62,42 @@ import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PUBLISH;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQTTTest extends AbstractMQTTTest {
@RunWith(Parameterized.class)
public class MQTTTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
private static final int NUM_MESSAGES = 250;
@Parameters(name= "{index}: scheme({0})")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{"mqtt", false},
{"mqtt+ssl", true},
{"mqtt+nio", false}
// TODO - Fails {"mqtt+nio+ssl", true}
});
}
@Test(timeout = 60 * 1000)
public void testSendAndReceiveMQTT() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
initializeConnection(subscriptionProvider);
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
final CountDownLatch latch = new CountDownLatch(numberOfMessages);
final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < numberOfMessages; i++) {
for (int i = 0; i < NUM_MESSAGES; i++) {
try {
byte[] payload = subscriptionProvider.receive(10000);
assertNotNull("Should get a message", payload);
@ -105,7 +115,7 @@ public class MQTTTest extends AbstractMQTTTest {
final MQTTClientProvider publishProvider = getMQTTClientProvider();
initializeConnection(publishProvider);
for (int i = 0; i < numberOfMessages; i++) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Message " + i;
publishProvider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
}
@ -118,8 +128,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testUnsubscribeMQTT() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
initializeConnection(subscriptionProvider);
@ -127,12 +135,12 @@ public class MQTTTest extends AbstractMQTTTest {
subscriptionProvider.subscribe(topic, AT_MOST_ONCE);
final CountDownLatch latch = new CountDownLatch(numberOfMessages / 2);
final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES / 2);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < numberOfMessages; i++) {
for (int i = 0; i < NUM_MESSAGES; i++) {
try {
byte[] payload = subscriptionProvider.receive(10000);
assertNotNull("Should get a message", payload);
@ -150,9 +158,9 @@ public class MQTTTest extends AbstractMQTTTest {
final MQTTClientProvider publishProvider = getMQTTClientProvider();
initializeConnection(publishProvider);
for (int i = 0; i < numberOfMessages; i++) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Message " + i;
if (i == numberOfMessages / 2) {
if (i == NUM_MESSAGES / 2) {
subscriptionProvider.unsubscribe(topic);
}
publishProvider.publish(topic, payload.getBytes(), AT_LEAST_ONCE);
@ -171,13 +179,10 @@ public class MQTTTest extends AbstractMQTTTest {
* with AT_MOST_ONCE - in MQTT the QoS is always determined by the
* message as published - not the wish of the subscriber
*/
addMQTTConnector();
brokerService.start();
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
provider.subscribe("foo", EXACTLY_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Test Message: " + i;
provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
byte[] message = provider.receive(5000);
@ -189,13 +194,10 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 2 * 60 * 1000)
public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
provider.subscribe("foo", EXACTLY_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Test Message: " + i;
provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
byte[] message = provider.receive(5000);
@ -207,13 +209,10 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 2 * 60 * 1000)
public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
provider.subscribe("foo", AT_MOST_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Test Message: " + i;
provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
byte[] message = provider.receive(5000);
@ -225,13 +224,10 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testSendAndReceiveAtMostOnce() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
provider.subscribe("foo", AT_MOST_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Test Message: " + i;
provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
byte[] message = provider.receive(5000);
@ -243,13 +239,10 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 2 * 60 * 1000)
public void testSendAndReceiveAtLeastOnce() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
provider.subscribe("foo", AT_LEAST_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Test Message: " + i;
provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
byte[] message = provider.receive(5000);
@ -261,8 +254,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testSendAndReceiveExactlyOnce() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider publisher = getMQTTClientProvider();
initializeConnection(publisher);
@ -270,7 +261,7 @@ public class MQTTTest extends AbstractMQTTTest {
initializeConnection(subscriber);
subscriber.subscribe("foo", EXACTLY_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Test Message: " + i;
publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
byte[] message = subscriber.receive(5000);
@ -287,9 +278,6 @@ public class MQTTTest extends AbstractMQTTTest {
for (int i = 0; i < payload.length; i++) {
payload[i] = '2';
}
addMQTTConnector();
brokerService.start();
final MQTTClientProvider publisher = getMQTTClientProvider();
initializeConnection(publisher);
@ -310,10 +298,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testSendAndReceiveRetainedMessages() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider publisher = getMQTTClientProvider();
initializeConnection(publisher);
@ -348,9 +332,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 30 * 1000)
public void testValidZeroLengthClientId() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("");
mqtt.setCleanSession(true);
@ -362,9 +343,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 2 * 60 * 1000)
public void testMQTTPathPatterns() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("");
mqtt.setCleanSession(true);
@ -434,9 +412,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testMQTTRetainQoS() throws Exception {
addMQTTConnector();
brokerService.start();
String[] topics = { "AT_MOST_ONCE", "AT_LEAST_ONCE", "EXACTLY_ONCE" };
for (int i = 0; i < topics.length; i++) {
final String topic = topics[i];
@ -480,9 +455,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testDuplicateSubscriptions() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setKeepAlive((short) 2);
@ -528,9 +500,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 120 * 1000)
public void testRetainedMessage() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short) 2);
@ -597,59 +566,8 @@ public class MQTTTest extends AbstractMQTTTest {
}
}
@Test(timeout = 60 * 1000)
public void testFailedSubscription() throws Exception {
addMQTTConnector();
final SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin();
authenticationPlugin.setAnonymousAccessAllowed(true);
final String ANONYMOUS = "anonymous";
authenticationPlugin.setAnonymousGroup(ANONYMOUS);
final DefaultAuthorizationMap map = new DefaultAuthorizationMap();
// only one authorized destination, anonymous for anonymous group!
map.put(new ActiveMQTopic(ANONYMOUS), new GroupPrincipal(ANONYMOUS));
final AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(new SimpleAuthorizationMap(map, map, map));
brokerService.setPlugins(new BrokerPlugin[] { authorizationPlugin, authenticationPlugin });
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setKeepAlive((short) 2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
final String NAMED = "named";
byte[] qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE) });
assertEquals((byte) 0x80, qos[0]);
assertEquals((byte) QoS.EXACTLY_ONCE.ordinal(), qos[1]);
// validate the subscription by sending a retained message
connection.publish(ANONYMOUS, ANONYMOUS.getBytes(), QoS.AT_MOST_ONCE, true);
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(ANONYMOUS, new String(msg.getPayload()));
msg.ack();
connection.unsubscribe(new String[] { ANONYMOUS });
qos = connection.subscribe(new Topic[] { new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE) });
assertEquals((byte) QoS.AT_LEAST_ONCE.ordinal(), qos[0]);
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(ANONYMOUS, new String(msg.getPayload()));
msg.ack();
connection.disconnect();
}
@Test(timeout = 60 * 1000)
public void testUniqueMessageIds() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setKeepAlive((short) 2);
@ -737,9 +655,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testResendMessageId() throws Exception {
addMQTTConnector("trace=true");
brokerService.start();
final MQTT mqtt = createMQTTConnection("resend", false);
mqtt.setKeepAlive((short) 5);
@ -806,9 +721,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 90 * 1000)
public void testPacketIdGeneratorNonCleanSession() throws Exception {
addMQTTConnector("trace=true");
brokerService.start();
final MQTT mqtt = createMQTTConnection("nonclean-packetid", false);
mqtt.setKeepAlive((short) 15);
@ -882,9 +794,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 90 * 1000)
public void testPacketIdGeneratorCleanSession() throws Exception {
addMQTTConnector("trace=true");
brokerService.start();
final String[] cleanClientIds = new String[] { "", "clean-packetid", null };
final Map<Short, PUBLISH> publishMap = new ConcurrentHashMap<Short, PUBLISH>();
MQTT[] mqtts = new MQTT[cleanClientIds.length];
@ -944,9 +853,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testClientConnectionFailure() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection("reconnect", false);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
@ -983,9 +889,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testCleanSession() throws Exception {
addMQTTConnector();
brokerService.start();
final String CLIENTID = "cleansession";
final MQTT mqttNotClean = createMQTTConnection(CLIENTID, false);
BlockingConnection notClean = mqttNotClean.blockingConnection();
@ -1025,10 +928,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testSendMQTTReceiveJMS() throws Exception {
addMQTTConnector();
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start();
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
final String DESTINATION_NAME = "foo.*";
@ -1037,7 +936,7 @@ public class MQTTTest extends AbstractMQTTTest {
final String RETAINED = "RETAINED";
provider.publish("foo/bah", RETAINED.getBytes(), AT_LEAST_ONCE, true);
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
// MUST set to true to receive retained messages
activeMQConnection.setUseRetroactiveConsumer(true);
activeMQConnection.start();
@ -1052,7 +951,7 @@ public class MQTTTest extends AbstractMQTTTest {
assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
for (int i = 0; i < numberOfMessages; i++) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Test Message: " + i;
provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
message = (ActiveMQMessage) consumer.receive(5000);
@ -1067,13 +966,10 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 2 * 60 * 1000)
public void testSendJMSReceiveMQTT() throws Exception {
addMQTTConnector();
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start();
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
activeMQConnection.setUseRetroactiveConsumer(true);
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -1094,7 +990,7 @@ public class MQTTTest extends AbstractMQTTTest {
assertNotNull("Should get retained message", message);
assertEquals(RETAINED, new String(message));
for (int i = 0; i < numberOfMessages; i++) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "This is Test Message: " + i;
sendMessage = s.createTextMessage(payload);
producer.send(sendMessage);
@ -1109,8 +1005,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testPingKeepsInactivityMonitorAlive() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setKeepAlive((short) 2);
@ -1130,8 +1024,10 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testTurnOffInactivityMonitor() throws Exception {
addMQTTConnector("transport.useInactivityMonitor=false");
brokerService.start();
stopBroker();
protocolConfig = "transport.useInactivityMonitor=false";
startBroker();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo3");
mqtt.setKeepAlive((short) 2);
@ -1151,13 +1047,8 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 30 * 10000)
public void testJmsMapping() throws Exception {
addMQTTConnector();
addOpenwireConnector();
brokerService.start();
// start up jms consumer
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:" + openwireConnector.getConnectUri().getPort());
Connection jmsConn = factory.createConnection();
Connection jmsConn = cf.createConnection();
Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = session.createTopic("test.foo");
MessageConsumer consumer = session.createConsumer(dest);
@ -1204,8 +1095,6 @@ public class MQTTTest extends AbstractMQTTTest {
payload[i] = '2';
}
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("MQTT-Client");
mqtt.setCleanSession(false);
@ -1245,10 +1134,7 @@ public class MQTTTest extends AbstractMQTTTest {
int numberOfRuns = 100;
int messagesPerRun = 2;
addMQTTConnector("trace=true");
brokerService.start();
final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true);
final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
final BlockingConnection connectionPub = mqttPub.blockingConnection();
@ -1298,9 +1184,10 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 30 * 1000)
public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
// default keep alive in milliseconds
addMQTTConnector("transport.defaultKeepAlive=2000");
brokerService.start();
stopBroker();
protocolConfig = "transport.defaultKeepAlive=2000";
startBroker();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setKeepAlive((short) 0);
@ -1318,9 +1205,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testReuseConnection() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("Test-Client");
@ -1340,9 +1224,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testNoMessageReceivedAfterUnsubscribeMQTT() throws Exception {
addMQTTConnector();
brokerService.setPersistent(true);
brokerService.start();
Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
MQTT mqttPub = createMQTTConnection("MQTTPub-Client", true);
@ -1395,8 +1276,6 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout = 60 * 1000)
public void testMQTT311Connection() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setVersion("3.1.1");
@ -1405,64 +1284,8 @@ public class MQTTTest extends AbstractMQTTTest {
connection.disconnect();
}
@Test(timeout = 60 * 1000)
public void testWildcardRetainedSubscription() throws Exception {
addMQTTConnector();
LinkedList<AuthenticationUser> users = new LinkedList<AuthenticationUser>();
users.add(new AuthenticationUser("user", "user", "users"));
users.add(new AuthenticationUser("admin", "admin", "admins"));
final SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
DefaultAuthorizationMap map = new DefaultAuthorizationMap();
LinkedList<DestinationMapEntry> authz = new LinkedList<DestinationMapEntry>();
AuthorizationEntry entryOne = new AuthorizationEntry();
entryOne.setDestination(new ActiveMQTopic("one"));
entryOne.setAdmin("admins");
entryOne.setRead("admins");
entryOne.setWrite("admins");
authz.add(entryOne);
AuthorizationEntry entryTwo = new AuthorizationEntry();
entryTwo.setDestination(new ActiveMQTopic("two"));
entryTwo.setAdmin("users");
entryTwo.setRead("users");
entryTwo.setWrite("users");
authz.add(entryTwo);
map.setAuthorizationEntries(authz);
AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(map);
brokerService.setPlugins(new BrokerPlugin[] { authorizationPlugin, authenticationPlugin });
brokerService.start();
MQTT mqttPub = createMQTTConnection("pub", true);
mqttPub.setUserName("admin");
mqttPub.setPassword("admin");
BlockingConnection connectionPub = mqttPub.blockingConnection();
connectionPub.connect();
connectionPub.publish("one", "test".getBytes(), QoS.AT_LEAST_ONCE, true);
MQTT mqttSub = createMQTTConnection("sub", true);
mqttSub.setUserName("user");
mqttSub.setPassword("user");
BlockingConnection connectionSub = mqttSub.blockingConnection();
connectionSub.connect();
connectionSub.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)});
Message msg = connectionSub.receive(1, TimeUnit.SECONDS);
assertNull("Shouldn't receive the message", msg);
}
@Test(timeout = 60 * 1000)
public void testActiveMQRecoveryPolicy() throws Exception {
addMQTTConnector();
brokerService.start();
// test with ActiveMQ LastImageSubscriptionRecoveryPolicy
final PolicyMap policyMap = new PolicyMap();
final PolicyEntry policyEntry = new PolicyEntry();
@ -1507,49 +1330,25 @@ public class MQTTTest extends AbstractMQTTTest {
assertEquals("Should receive 2 non-retained messages", 2, nonretain[0]);
}
@Override
protected String getProtocolScheme() {
return "mqtt";
}
@Test(timeout = 60 * 1000)
public void testPingOnMQTT() throws Exception {
stopBroker();
protocolConfig = "maxInactivityDuration=-1";
startBroker();
protected MQTTClientProvider getMQTTClientProvider() {
return new FuseMQQTTClientProvider();
}
protected MQTT createMQTTConnection() throws Exception {
return createMQTTConnection(null, false);
}
protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1);
mqtt.setReconnectAttemptsMax(0);
mqtt.setTracer(createTracer());
if (clientId != null) {
mqtt.setClientId(clientId);
}
mqtt.setCleanSession(clean);
mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
// shut off connect retry
return mqtt;
}
protected Tracer createTracer() {
return new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client Received:\n" + frame);
}
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("test-mqtt");
mqtt.setKeepAlive((short)2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client Sent:\n" + frame);
public boolean isSatisified() throws Exception {
return connection.isConnected();
}
}));
@Override
public void debug(String message, Object... args) {
LOG.info(String.format(message, args));
}
};
connection.disconnect();
}
}

View File

@ -20,14 +20,20 @@ package org.apache.activemq.transport.mqtt;
import java.io.File;
import java.io.IOException;
import java.security.ProtectionDomain;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
@ -36,14 +42,6 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.AuthorizationEntry;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.DefaultAuthorizationMap;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.security.TempDestinationAuthorizationEntry;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.transport.mqtt.util.ResourceLoadingSslContext;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Tracer;
@ -52,6 +50,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.runners.Parameterized.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,13 +60,17 @@ public class MQTTTestSupport {
protected BrokerService brokerService;
protected int port;
protected int sslPort;
protected int nioPort;
protected int nioSslPort;
protected String jmsUri = "vm://localhost";
protected ActiveMQConnectionFactory cf;
protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
protected int numberOfMessages;
protected boolean persistent;
protected String protocolConfig;
@Parameter(0)
public String protocolScheme;
@Parameter(1)
public boolean useSSL;
public static final int AT_MOST_ONCE = 0;
public static final int AT_LEAST_ONCE = 1;
@ -80,18 +83,14 @@ public class MQTTTestSupport {
return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
}
public static void main(String[] args) throws Exception {
final MQTTTestSupport s = new MQTTTestSupport();
public MQTTTestSupport() {
this.protocolScheme = "mqtt";
this.useSSL = false;
}
s.sslPort = 5675;
s.port = 5676;
s.nioPort = 5677;
s.nioSslPort = 5678;
s.startBroker();
while(true) {
Thread.sleep(100000);
}
public MQTTTestSupport(String connectorScheme, boolean useSsl) {
this.protocolScheme = connectorScheme;
this.useSSL = useSsl;
}
public String getName() {
@ -100,8 +99,16 @@ public class MQTTTestSupport {
@Before
public void setUp() throws Exception {
String basedir = basedir().getPath();
System.setProperty("javax.net.ssl.trustStore", basedir + "/src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", basedir + "/src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
exceptions.clear();
numberOfMessages = 1000;
startBroker();
}
@ -162,84 +169,16 @@ public class MQTTTestSupport {
brokerService = new BrokerService();
brokerService.setPersistent(isPersistent());
brokerService.setAdvisorySupport(false);
brokerService.setSchedulerSupport(true);
brokerService.setSchedulerSupport(isSchedulerSupportEnabled());
brokerService.setPopulateJMSXUserID(true);
brokerService.setSchedulerSupport(true);
JobSchedulerStoreImpl jobStore = new JobSchedulerStoreImpl();
jobStore.setDirectory(new File("activemq-data"));
brokerService.setJobSchedulerStore(jobStore);
}
protected BrokerPlugin configureAuthentication() throws Exception {
List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
users.add(new AuthenticationUser("system", "manager", "users,admins"));
users.add(new AuthenticationUser("user", "password", "users"));
users.add(new AuthenticationUser("guest", "password", "guests"));
SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
return authenticationPlugin;
return null;
}
protected BrokerPlugin configureAuthorization() throws Exception {
@SuppressWarnings("rawtypes")
List<DestinationMapEntry> authorizationEntries = new ArrayList<DestinationMapEntry>();
AuthorizationEntry entry = new AuthorizationEntry();
entry.setQueue(">");
entry.setRead("admins");
entry.setWrite("admins");
entry.setAdmin("admins");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setQueue("USERS.>");
entry.setRead("users");
entry.setWrite("users");
entry.setAdmin("users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setQueue("GUEST.>");
entry.setRead("guests");
entry.setWrite("guests,users");
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic(">");
entry.setRead("admins");
entry.setWrite("admins");
entry.setAdmin("admins");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("USERS.>");
entry.setRead("users");
entry.setWrite("users");
entry.setAdmin("users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("GUEST.>");
entry.setRead("guests");
entry.setWrite("guests,users");
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("ActiveMQ.Advisory.>");
entry.setRead("guests,users");
entry.setWrite("guests,users");
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry();
tempEntry.setRead("admins");
tempEntry.setWrite("admins");
tempEntry.setAdmin("admins");
DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries);
authorizationMap.setTempDestinationAuthorizationEntry(tempEntry);
AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap);
return authorizationPlugin;
return null;
}
protected void applyBrokerPolicies() throws Exception {
@ -255,8 +194,16 @@ public class MQTTTestSupport {
// Overrides of this method can add additional configuration options or add multiple
// MQTT transport connectors as needed, the port variable is always supposed to be
// assigned the primary MQTT connector's port.
TransportConnector connector = brokerService.addConnector(getProtocolScheme() + "://0.0.0.0:" + port);
port = connector.getConnectUri().getPort();
StringBuilder connectorURI = new StringBuilder();
connectorURI.append(getProtocolScheme());
connectorURI.append("://0.0.0.0:").append(port);
if (protocolConfig != null && !protocolConfig.isEmpty()) {
connectorURI.append("?").append(protocolConfig);
}
port = brokerService.addConnector(connectorURI.toString()).getConnectUri().getPort();
LOG.info("Added connector {} to broker", getProtocolScheme());
}
public void stopBroker() throws Exception {
@ -299,7 +246,7 @@ public class MQTTTestSupport {
/**
* Initialize an MQTTClientProvider instance. By default this method uses the port that's
* assigned to be the TCP based port using the base version of addMQTTConnector. A sbuclass
* assigned to be the TCP based port using the base version of addMQTTConnector. A subclass
* can either change the value of port or override this method to assign the correct port.
*
* @param provider
@ -308,14 +255,41 @@ public class MQTTTestSupport {
* @throws Exception if an error occurs during initialization.
*/
protected void initializeConnection(MQTTClientProvider provider) throws Exception {
provider.connect("tcp://localhost:" + port);
if (!isUseSSL()) {
provider.connect("tcp://localhost:" + port);
} else {
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[] { new DefaultTrustManager() }, new SecureRandom());
provider.setSslContext(ctx);
provider.connect("ssl://localhost:" + port);
}
}
protected String getProtocolScheme() {
return "mqtt";
public String getProtocolScheme() {
return protocolScheme;
}
protected boolean isPersistent() {
public void setProtocolScheme(String scheme) {
this.protocolScheme = scheme;
}
public boolean isUseSSL() {
return this.useSSL;
}
public void setUseSSL(boolean useSSL) {
this.useSSL = useSSL;
}
public boolean isPersistent() {
return persistent;
}
public int getPort() {
return this.port;
}
public boolean isSchedulerSupportEnabled() {
return false;
}
@ -355,6 +329,14 @@ public class MQTTTestSupport {
}
protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception {
if (isUseSSL()) {
return createMQTTSslConnection(clientId, clean);
} else {
return createMQTTTcpConnection(clientId, clean);
}
}
private MQTT createMQTTTcpConnection(String clientId, boolean clean) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1);
mqtt.setReconnectAttemptsMax(0);
@ -367,6 +349,23 @@ public class MQTTTestSupport {
return mqtt;
}
private MQTT createMQTTSslConnection(String clientId, boolean clean) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1);
mqtt.setReconnectAttemptsMax(0);
mqtt.setTracer(createTracer());
mqtt.setHost("ssl://localhost:" + port);
if (clientId != null) {
mqtt.setClientId(clientId);
}
mqtt.setCleanSession(clean);
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[] { new DefaultTrustManager() }, new SecureRandom());
mqtt.setSslContext(ctx);
return mqtt;
}
protected Tracer createTracer() {
return new Tracer() {
@Override
@ -385,4 +384,20 @@ public class MQTTTestSupport {
}
};
}
static class DefaultTrustManager implements X509TrustManager {
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}
}

View File

@ -1,71 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.net.ProtocolException;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQTTTests extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTests.class);
@Test(timeout = 60 * 1000)
public void testBadUserNameOrPasswordGetsConnAckWithErrorCode() throws Exception {
MQTT mqttPub = createMQTTConnection("pub", true);
mqttPub.setUserName("admin");
mqttPub.setPassword("admin");
mqttPub.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client received: {}", frame);
if (frame.messageType() == CONNACK.TYPE) {
CONNACK connAck = new CONNACK();
try {
connAck.decode(frame);
LOG.info("{}", connAck);
assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, connAck.code());
} catch (ProtocolException e) {
fail("Error decoding publish " + e.getMessage());
}
}
}
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client sent: {}", frame);
}
});
BlockingConnection connectionPub = mqttPub.blockingConnection();
try {
connectionPub.connect();
fail("Should not be able to connect.");
} catch (Exception e) {}
}
}

View File

@ -1,119 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.TransportConnector;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PahoMQTNioTTest extends PahoMQTTTest {
private static final Logger LOG = LoggerFactory.getLogger(PahoMQTNioTTest.class);
@Override
protected String getProtocolScheme() {
return "mqtt+nio";
}
@Test(timeout = 300000)
public void testLotsOfClients() throws Exception {
final int CLIENTS = Integer.getInteger("PahoMQTNioTTest.CLIENTS", 100);
LOG.info("Using: " + CLIENTS + " clients");
addMQTTConnector();
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start();
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = s.createConsumer(s.createTopic("test"));
final AtomicInteger receiveCounter = new AtomicInteger();
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
receiveCounter.incrementAndGet();
}
});
final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>();
final CountDownLatch connectedDoneLatch = new CountDownLatch(CLIENTS);
final CountDownLatch disconnectDoneLatch = new CountDownLatch(CLIENTS);
final CountDownLatch sendBarrier = new CountDownLatch(1);
for (int i = 0; i < CLIENTS; i++) {
Thread.sleep(10);
new Thread(null, null, "client:" + i) {
@Override
public void run() {
try {
MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), Thread.currentThread().getName(),
new MemoryPersistence());
client.connect();
connectedDoneLatch.countDown();
sendBarrier.await();
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
client.publish("test", "hello".getBytes(), 1, false);
}
client.disconnect();
client.close();
} catch (Throwable e) {
e.printStackTrace();
asyncError.set(e);
} finally {
disconnectDoneLatch.countDown();
}
}
}.start();
}
connectedDoneLatch.await();
assertNull("Async error: " + asyncError.get(), asyncError.get());
sendBarrier.countDown();
LOG.info("All clients connected... waiting to receive sent messages...");
// We should eventually get all the messages.
within(30, TimeUnit.SECONDS, new Task() {
@Override
public void run() throws Exception {
assertTrue(receiveCounter.get() == CLIENTS * 10);
}
});
LOG.info("All messages received.");
disconnectDoneLatch.await();
assertNull("Async error: " + asyncError.get(), asyncError.get());
}
}

View File

@ -16,31 +16,125 @@
*/
package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.TransportConnector;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PahoMQTTTest extends AbstractMQTTTest {
@RunWith(Parameterized.class)
public class PahoMQTTTest extends MQTTTestSupport {
@Test(timeout=300000)
public void testSendAndReceiveMQTT() throws Exception {
addMQTTConnector();
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start();
private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
@Parameters(name= "{index}: scheme({0})")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{"mqtt", false},
{"mqtt+nio", false}
});
}
@Test(timeout = 300000)
public void testLotsOfClients() throws Exception {
final int CLIENTS = Integer.getInteger("PahoMQTTTest.CLIENTS", 100);
LOG.info("Using: {} clients", CLIENTS);
ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = s.createConsumer(s.createTopic("test"));
MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), "clientid", new MemoryPersistence());
final AtomicInteger receiveCounter = new AtomicInteger();
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
receiveCounter.incrementAndGet();
}
});
final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>();
final CountDownLatch connectedDoneLatch = new CountDownLatch(CLIENTS);
final CountDownLatch disconnectDoneLatch = new CountDownLatch(CLIENTS);
final CountDownLatch sendBarrier = new CountDownLatch(1);
for (int i = 0; i < CLIENTS; i++) {
Thread.sleep(10);
new Thread(null, null, "client:" + i) {
@Override
public void run() {
try {
MqttClient client = new MqttClient("tcp://localhost:" + getPort(),
Thread.currentThread().getName(),
new MemoryPersistence());
client.connect();
connectedDoneLatch.countDown();
sendBarrier.await();
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
client.publish("test", "hello".getBytes(), 1, false);
}
client.disconnect();
client.close();
} catch (Throwable e) {
e.printStackTrace();
asyncError.set(e);
} finally {
disconnectDoneLatch.countDown();
}
}
}.start();
}
connectedDoneLatch.await();
assertNull("Async error: " + asyncError.get(), asyncError.get());
sendBarrier.countDown();
LOG.info("All clients connected... waiting to receive sent messages...");
// We should eventually get all the messages.
within(30, TimeUnit.SECONDS, new Task() {
@Override
public void run() throws Exception {
assertTrue(receiveCounter.get() == CLIENTS * 10);
}
});
LOG.info("All messages received.");
disconnectDoneLatch.await();
assertNull("Async error: " + asyncError.get(), asyncError.get());
}
@Test(timeout=300000)
public void testSendAndReceiveMQTT() throws Exception {
ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = s.createConsumer(s.createTopic("test"));
MqttClient client = new MqttClient("tcp://localhost:" + getPort(), "clientid", new MemoryPersistence());
client.connect();
client.publish("test", "hello".getBytes(), 1, false);