diff --git a/activemq-core/src/test/java/org/apache/activemq/AutoFailTestSupport.java b/activemq-broker/src/test/java/org/apache/activemq/AutoFailTestSupport.java
similarity index 100%
rename from activemq-core/src/test/java/org/apache/activemq/AutoFailTestSupport.java
rename to activemq-broker/src/test/java/org/apache/activemq/AutoFailTestSupport.java
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java
deleted file mode 100644
index 74cfeb28ad..0000000000
--- a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.mqtt;
-
-import org.apache.activemq.broker.BrokerService;
-
-public class MTQQNioTest extends MQTTTest {
- protected void addMQTTConnector(BrokerService brokerService) throws Exception {
- brokerService.addConnector("mqtt+nio://localhost:1883?maxInactivityDuration=-1");
- }
-
-}
diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml
index 2234aa4fa6..cd3f051dc4 100755
--- a/activemq-mqtt/pom.xml
+++ b/activemq-mqtt/pom.xml
@@ -138,6 +138,13 @@
+
+ ${project.groupId}
+ activemq-broker
+ test-jar
+ test
+
+
junit
junit
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
similarity index 85%
rename from activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
rename to activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
index 6697bef364..a7c47d2d85 100644
--- a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
@@ -28,25 +28,27 @@ import org.apache.activemq.broker.BrokerService;
import org.fusesource.mqtt.client.MQTT;
import org.junit.Ignore;
-@Ignore("hangs atm, needs investigation")
public class MQTTSSLTest extends MQTTTest {
+
public void startBroker() throws Exception {
- System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+ String basedir = basedir().getPath();
+ System.setProperty("javax.net.ssl.trustStore", basedir+"/src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
- System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+ System.setProperty("javax.net.ssl.keyStore", basedir+"/src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
super.startBroker();
}
- protected void addMQTTConnector(BrokerService brokerService) throws Exception {
- brokerService.addConnector("mqtt+ssl://localhost:8883");
+ @Override
+ protected String getProtocolScheme() {
+ return "mqtt+ssl";
}
protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT();
- mqtt.setHost("ssl://localhost:8883");
+ mqtt.setHost("ssl://localhost:"+mqttConnector.getConnectUri().getPort());
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
mqtt.setSslContext(ctx);
@@ -65,5 +67,4 @@ public class MQTTSSLTest extends MQTTTest {
return new X509Certificate[0];
}
}
-
}
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
similarity index 88%
rename from activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
rename to activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 5c46697d6c..d422376ffa 100644
--- a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -22,9 +22,12 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URL;
+import java.security.ProtectionDomain;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -55,12 +58,19 @@ import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.CONNECT;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQTTTest {
+
+ public File basedir() throws IOException {
+ ProtectionDomain protectionDomain = getClass().getProtectionDomain();
+ return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
+ }
+
protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
protected BrokerService brokerService;
protected LinkedList exceptions = new LinkedList();
@@ -88,9 +98,9 @@ public class MQTTTest {
@Test
public void testSendAndReceiveMQTT() throws Exception {
- addMQTTConnector(brokerService);
+ addMQTTConnector();
brokerService.start();
- MQTT mqtt = new MQTT();
+ MQTT mqtt = createMQTTConnection();
final BlockingConnection subscribeConnection = mqtt.blockingConnection();
subscribeConnection.connect();
Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE);
@@ -131,7 +141,7 @@ public class MQTTTest {
@Test
public void testSendAndReceiveAtMostOnce() throws Exception {
- addMQTTConnector(brokerService);
+ addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive(Short.MAX_VALUE);
@@ -153,7 +163,7 @@ public class MQTTTest {
@Test
public void testSendAndReceiveAtLeastOnce() throws Exception {
- addMQTTConnector(brokerService);
+ addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive(Short.MAX_VALUE);
@@ -176,7 +186,7 @@ public class MQTTTest {
@Test
public void testSendAndReceiveExactlyOnce() throws Exception {
- addMQTTConnector(brokerService);
+ addMQTTConnector();
brokerService.start();
MQTT publisher = createMQTTConnection();
BlockingConnection pubConnection = publisher.blockingConnection();
@@ -209,7 +219,7 @@ public class MQTTTest {
for (int i = 0; i < payload.length; i++){
payload[i] = '2';
}
- addMQTTConnector(brokerService);
+ addMQTTConnector();
brokerService.start();
MQTT publisher = createMQTTConnection();
@@ -238,15 +248,15 @@ public class MQTTTest {
@Test
public void testSendMQTTReceiveJMS() throws Exception {
- addMQTTConnector(brokerService);
- brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
+ addMQTTConnector();
+ TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start();
MQTT mqtt = createMQTTConnection();
BlockingConnection connection = mqtt.blockingConnection();
final String DESTINATION_NAME = "foo.*";
connection.connect();
- ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection();
+ ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
@@ -268,15 +278,15 @@ public class MQTTTest {
@Test
public void testSendJMSReceiveMQTT() throws Exception {
- addMQTTConnector(brokerService);
- brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
+ addMQTTConnector();
+ TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive(Short.MAX_VALUE);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
- ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection();
+ ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic jmsTopic = s.createTopic("foo.far");
@@ -296,12 +306,10 @@ public class MQTTTest {
connection.disconnect();
}
- @Test
public void testInactivityTimeoutDisconnectsClient() throws Exception{
- addMQTTConnector(brokerService);
+ addMQTTConnector();
brokerService.start();
- final TransportConnector mqttConnector = brokerService.getTransportConnectorByScheme("mqtt");
// manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn
// from timing out
@@ -335,7 +343,7 @@ public class MQTTTest {
private Transport createManualMQTTClient() throws IOException, URISyntaxException {
Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(),
- new URI("tcp://localhost:1883"), null);
+ new URI("tcp://localhost:"+mqttConnector.getConnectUri().getPort()), null);
clientTransport.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
@@ -359,7 +367,7 @@ public class MQTTTest {
@Test
public void testPingKeepsInactivityMonitorAlive() throws Exception {
- addMQTTConnector(brokerService);
+ addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short)2);
@@ -379,7 +387,7 @@ public class MQTTTest {
@Test
public void testTurnOffInactivityMonitor()throws Exception{
- addMQTTConnector(brokerService, "?transport.useInactivityMonitor=false");
+ addMQTTConnector("?transport.useInactivityMonitor=false");
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short)2);
@@ -397,30 +405,11 @@ public class MQTTTest {
connection.disconnect();
}
- @Test
- public void testPingOnMQTTNIO() throws Exception {
- brokerService.addConnector("mqtt+nio://localhost:1883");
- brokerService.start();
- MQTT mqtt = createMQTTConnection();
- mqtt.setKeepAlive((short)2);
- final BlockingConnection connection = mqtt.blockingConnection();
- connection.connect();
-
- assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return connection.isConnected();
- }
- }));
-
- connection.disconnect();
- }
@Test
public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
// default keep alive in milliseconds
- brokerService.addConnector("mqtt://localhost:1883?transport.defaultKeepAlive=2000");
+ addMQTTConnector("?transport.defaultKeepAlive=2000");
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short)0);
@@ -436,17 +425,23 @@ public class MQTTTest {
}));
}
- protected void addMQTTConnector(BrokerService brokerService) throws Exception {
- brokerService.addConnector("mqtt://localhost:1883");
+ TransportConnector mqttConnector;
+
+ protected String getProtocolScheme() {
+ return "mqtt";
}
- protected void addMQTTConnector(BrokerService brokerService, String config) throws Exception {
- brokerService.addConnector("mqtt://localhost:1883" + config);
+ protected void addMQTTConnector() throws Exception {
+ addMQTTConnector("");
+ }
+
+ protected void addMQTTConnector(String config) throws Exception {
+ mqttConnector= brokerService.addConnector(getProtocolScheme()+"://localhost:0" + config);
}
protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT();
- mqtt.setHost("localhost", 1883);
+ mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
// shut off connect retry
mqtt.setConnectAttemptsMax(0);
mqtt.setReconnectAttemptsMax(0);
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java
new file mode 100644
index 0000000000..c839f09431
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import org.apache.activemq.util.Wait;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class MTQQNioTest extends MQTTTest {
+
+ @Override
+ protected String getProtocolScheme() {
+ return "mqtt+nio";
+ }
+
+ @Test
+ public void testPingOnMQTTNIO() throws Exception {
+ addMQTTConnector("?maxInactivityDuration=-1");
+ brokerService.start();
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setKeepAlive((short)2);
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return connection.isConnected();
+ }
+ }));
+
+ connection.disconnect();
+ }
+
+}
diff --git a/activemq-mqtt/src/test/resources/client.keystore b/activemq-mqtt/src/test/resources/client.keystore
new file mode 100755
index 0000000000..6ab1286b15
Binary files /dev/null and b/activemq-mqtt/src/test/resources/client.keystore differ
diff --git a/activemq-mqtt/src/test/resources/log4j.properties b/activemq-mqtt/src/test/resources/log4j.properties
new file mode 100755
index 0000000000..7cc19418fd
--- /dev/null
+++ b/activemq-mqtt/src/test/resources/log4j.properties
@@ -0,0 +1,42 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=INFO, out, stdout
+
+#log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
+#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
+#log4j.logger.org.apache.activemq.transport.failover=TRACE
+#log4j.logger.org.apache.activemq.store.jdbc=TRACE
+#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
+#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG
+
+# CONSOLE appender not used by default
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n
+#log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %-10.10X{activemq.broker} %-20.20X{activemq.connector} %-10.10X{activemq.destination} - %m%n
+
+# File appender
+log4j.appender.out=org.apache.log4j.FileAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %-10.10X{activemq.broker} %-20.20X{activemq.connector} %-10.10X{activemq.destination} - %m%n
+log4j.appender.out.file=target/activemq-test.log
+log4j.appender.out.append=true
diff --git a/activemq-mqtt/src/test/resources/server.keystore b/activemq-mqtt/src/test/resources/server.keystore
new file mode 100755
index 0000000000..b8af477f19
Binary files /dev/null and b/activemq-mqtt/src/test/resources/server.keystore differ