fixed some failing tests

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1499843 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2013-07-04 18:57:49 +00:00
parent 33ec1cf99b
commit 72e484c68f
4 changed files with 61 additions and 7 deletions

View File

@ -16,20 +16,18 @@
*/ */
package org.apache.activemq.transport.mqtt; package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertArrayEquals;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.security.ProtectionDomain; import java.security.ProtectionDomain;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.AutoFailTestSupport;
@ -40,6 +38,7 @@ import org.apache.activemq.util.ByteSequence;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
public abstract class AbstractMQTTTest extends AutoFailTestSupport { public abstract class AbstractMQTTTest extends AutoFailTestSupport {
protected TransportConnector mqttConnector; protected TransportConnector mqttConnector;
@ -78,6 +77,50 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport {
super.tearDown(); super.tearDown();
} }
@Test(timeout=300000)
public void testWillNotSentOnClose() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
initializeConnection(subscriptionProvider);
String willTopic = "lastWillAndTestament";
subscriptionProvider.subscribe(willTopic,AT_MOST_ONCE);
final AtomicInteger count = new AtomicInteger();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1; i++){
try {
byte[] payload = subscriptionProvider.receive(10000);
assertNull("Should get a message", payload);
count.incrementAndGet();
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
});
thread.start();
final MQTTClientProvider publishProvider = getMQTTClientProvider();
publishProvider.setWillTopic(willTopic);
publishProvider.setWillMessage("EverythingGoesToRob");
initializeConnection(publishProvider);
Thread.sleep(1000);
publishProvider.disconnect();
assertEquals(0, count.get());
subscriptionProvider.disconnect();
publishProvider.disconnect();
}
@Test(timeout=300000) @Test(timeout=300000)
public void testSendAndReceiveMQTT() throws Exception { public void testSendAndReceiveMQTT() throws Exception {
addMQTTConnector(); addMQTTConnector();

View File

@ -72,4 +72,14 @@ class FuseMQQTTClientProvider implements MQTTClientProvider {
public void setSslContext(SSLContext sslContext) { public void setSslContext(SSLContext sslContext) {
mqtt.setSslContext(sslContext); mqtt.setSslContext(sslContext);
} }
@Override
public void setWillMessage(String string) {
mqtt.setWillMessage(string);
}
@Override
public void setWillTopic(String topic) {
mqtt.setWillTopic(topic);
}
} }

View File

@ -23,5 +23,7 @@ public interface MQTTClientProvider {
void subscribe(String topic,int qos) throws Exception; void subscribe(String topic,int qos) throws Exception;
byte[] receive(int timeout) throws Exception; byte[] receive(int timeout) throws Exception;
void setSslContext(javax.net.ssl.SSLContext sslContext); void setSslContext(javax.net.ssl.SSLContext sslContext);
void setWillMessage(String string);
void setWillTopic(String topic);
} }

View File

@ -39,6 +39,7 @@ public class MQTTTest extends AbstractMQTTTest {
addMQTTConnector(); addMQTTConnector();
brokerService.start(); brokerService.start();
MQTT mqtt = createMQTTConnection(); MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setKeepAlive((short)2); mqtt.setKeepAlive((short)2);
final BlockingConnection connection = mqtt.blockingConnection(); final BlockingConnection connection = mqtt.blockingConnection();
connection.connect(); connection.connect();
@ -59,6 +60,7 @@ public class MQTTTest extends AbstractMQTTTest {
addMQTTConnector("transport.useInactivityMonitor=false"); addMQTTConnector("transport.useInactivityMonitor=false");
brokerService.start(); brokerService.start();
MQTT mqtt = createMQTTConnection(); MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo3");
mqtt.setKeepAlive((short)2); mqtt.setKeepAlive((short)2);
final BlockingConnection connection = mqtt.blockingConnection(); final BlockingConnection connection = mqtt.blockingConnection();
connection.connect(); connection.connect();
@ -192,6 +194,7 @@ public class MQTTTest extends AbstractMQTTTest {
addMQTTConnector("transport.defaultKeepAlive=2000"); addMQTTConnector("transport.defaultKeepAlive=2000");
brokerService.start(); brokerService.start();
MQTT mqtt = createMQTTConnection(); MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setKeepAlive((short)0); mqtt.setKeepAlive((short)0);
final BlockingConnection connection = mqtt.blockingConnection(); final BlockingConnection connection = mqtt.blockingConnection();
connection.connect(); connection.connect();
@ -232,10 +235,6 @@ public class MQTTTest extends AbstractMQTTTest {
return "mqtt"; return "mqtt";
} }
@Override
protected void addMQTTConnector() throws Exception {
addMQTTConnector();
}
@Override @Override
protected MQTTClientProvider getMQTTClientProvider() { protected MQTTClientProvider getMQTTClientProvider() {