Fixes AMQ-4167 and AMQ-4168, also changes the MQTT tests to use dynamic ports to avoid port conflicts. Fixes SSL tests.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1407497 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-11-09 15:42:41 +00:00
parent e8f12d8360
commit a68916dcc5
9 changed files with 147 additions and 76 deletions

View File

@ -1,26 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import org.apache.activemq.broker.BrokerService;
public class MTQQNioTest extends MQTTTest {
protected void addMQTTConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("mqtt+nio://localhost:1883?maxInactivityDuration=-1");
}
}

View File

@ -138,6 +138,13 @@
<!-- =============================== -->
<!-- Testing Dependencies -->
<!-- =============================== -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-broker</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -28,25 +28,27 @@ import org.apache.activemq.broker.BrokerService;
import org.fusesource.mqtt.client.MQTT;
import org.junit.Ignore;
@Ignore("hangs atm, needs investigation")
public class MQTTSSLTest extends MQTTTest {
public void startBroker() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
String basedir = basedir().getPath();
System.setProperty("javax.net.ssl.trustStore", basedir+"/src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStore", basedir+"/src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
super.startBroker();
}
protected void addMQTTConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("mqtt+ssl://localhost:8883");
@Override
protected String getProtocolScheme() {
return "mqtt+ssl";
}
protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost("ssl://localhost:8883");
mqtt.setHost("ssl://localhost:"+mqttConnector.getConnectUri().getPort());
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
mqtt.setSslContext(ctx);
@ -65,5 +67,4 @@ public class MQTTSSLTest extends MQTTTest {
return new X509Certificate[0];
}
}
}

View File

@ -22,9 +22,12 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.ProtectionDomain;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -55,12 +58,19 @@ import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.CONNECT;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQTTTest {
public File basedir() throws IOException {
ProtectionDomain protectionDomain = getClass().getProtectionDomain();
return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
}
protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
protected BrokerService brokerService;
protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
@ -88,9 +98,9 @@ public class MQTTTest {
@Test
public void testSendAndReceiveMQTT() throws Exception {
addMQTTConnector(brokerService);
addMQTTConnector();
brokerService.start();
MQTT mqtt = new MQTT();
MQTT mqtt = createMQTTConnection();
final BlockingConnection subscribeConnection = mqtt.blockingConnection();
subscribeConnection.connect();
Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE);
@ -131,7 +141,7 @@ public class MQTTTest {
@Test
public void testSendAndReceiveAtMostOnce() throws Exception {
addMQTTConnector(brokerService);
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive(Short.MAX_VALUE);
@ -153,7 +163,7 @@ public class MQTTTest {
@Test
public void testSendAndReceiveAtLeastOnce() throws Exception {
addMQTTConnector(brokerService);
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive(Short.MAX_VALUE);
@ -176,7 +186,7 @@ public class MQTTTest {
@Test
public void testSendAndReceiveExactlyOnce() throws Exception {
addMQTTConnector(brokerService);
addMQTTConnector();
brokerService.start();
MQTT publisher = createMQTTConnection();
BlockingConnection pubConnection = publisher.blockingConnection();
@ -209,7 +219,7 @@ public class MQTTTest {
for (int i = 0; i < payload.length; i++){
payload[i] = '2';
}
addMQTTConnector(brokerService);
addMQTTConnector();
brokerService.start();
MQTT publisher = createMQTTConnection();
@ -238,15 +248,15 @@ public class MQTTTest {
@Test
public void testSendMQTTReceiveJMS() throws Exception {
addMQTTConnector(brokerService);
brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
addMQTTConnector();
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start();
MQTT mqtt = createMQTTConnection();
BlockingConnection connection = mqtt.blockingConnection();
final String DESTINATION_NAME = "foo.*";
connection.connect();
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection();
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
@ -268,15 +278,15 @@ public class MQTTTest {
@Test
public void testSendJMSReceiveMQTT() throws Exception {
addMQTTConnector(brokerService);
brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
addMQTTConnector();
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive(Short.MAX_VALUE);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection();
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic jmsTopic = s.createTopic("foo.far");
@ -296,12 +306,10 @@ public class MQTTTest {
connection.disconnect();
}
@Test
public void testInactivityTimeoutDisconnectsClient() throws Exception{
addMQTTConnector(brokerService);
addMQTTConnector();
brokerService.start();
final TransportConnector mqttConnector = brokerService.getTransportConnectorByScheme("mqtt");
// manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn
// from timing out
@ -335,7 +343,7 @@ public class MQTTTest {
private Transport createManualMQTTClient() throws IOException, URISyntaxException {
Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(),
new URI("tcp://localhost:1883"), null);
new URI("tcp://localhost:"+mqttConnector.getConnectUri().getPort()), null);
clientTransport.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
@ -359,7 +367,7 @@ public class MQTTTest {
@Test
public void testPingKeepsInactivityMonitorAlive() throws Exception {
addMQTTConnector(brokerService);
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short)2);
@ -379,7 +387,7 @@ public class MQTTTest {
@Test
public void testTurnOffInactivityMonitor()throws Exception{
addMQTTConnector(brokerService, "?transport.useInactivityMonitor=false");
addMQTTConnector("?transport.useInactivityMonitor=false");
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short)2);
@ -397,30 +405,11 @@ public class MQTTTest {
connection.disconnect();
}
@Test
public void testPingOnMQTTNIO() throws Exception {
brokerService.addConnector("mqtt+nio://localhost:1883");
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short)2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return connection.isConnected();
}
}));
connection.disconnect();
}
@Test
public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
// default keep alive in milliseconds
brokerService.addConnector("mqtt://localhost:1883?transport.defaultKeepAlive=2000");
addMQTTConnector("?transport.defaultKeepAlive=2000");
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short)0);
@ -436,17 +425,23 @@ public class MQTTTest {
}));
}
protected void addMQTTConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("mqtt://localhost:1883");
TransportConnector mqttConnector;
protected String getProtocolScheme() {
return "mqtt";
}
protected void addMQTTConnector(BrokerService brokerService, String config) throws Exception {
brokerService.addConnector("mqtt://localhost:1883" + config);
protected void addMQTTConnector() throws Exception {
addMQTTConnector("");
}
protected void addMQTTConnector(String config) throws Exception {
mqttConnector= brokerService.addConnector(getProtocolScheme()+"://localhost:0" + config);
}
protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
// shut off connect retry
mqtt.setConnectAttemptsMax(0);
mqtt.setReconnectAttemptsMax(0);

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import org.apache.activemq.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class MTQQNioTest extends MQTTTest {
@Override
protected String getProtocolScheme() {
return "mqtt+nio";
}
@Test
public void testPingOnMQTTNIO() throws Exception {
addMQTTConnector("?maxInactivityDuration=-1");
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short)2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return connection.isConnected();
}
}));
connection.disconnect();
}
}

Binary file not shown.

View File

@ -0,0 +1,42 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
#
# The logging properties used during tests..
#
log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
#log4j.logger.org.apache.activemq.transport.failover=TRACE
#log4j.logger.org.apache.activemq.store.jdbc=TRACE
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n
#log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %-10.10X{activemq.broker} %-20.20X{activemq.connector} %-10.10X{activemq.destination} - %m%n
# File appender
log4j.appender.out=org.apache.log4j.FileAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n
#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %-10.10X{activemq.broker} %-20.20X{activemq.connector} %-10.10X{activemq.destination} - %m%n
log4j.appender.out.file=target/activemq-test.log
log4j.appender.out.append=true

Binary file not shown.